In [1]:
import os
import time
import math
import json
import redis
import asyncio
import traceback
from datetime import datetime
from aredis import StrictRedis
from multiprocessing import Process

In [2]:
from pathlib import Path
os.chdir(Path(os.getcwd()).parent)

In [3]:
from libs.quotation import Quotation
from libs.dailydata import DailyData
from libs.utils import Utils
from libs.cython.compute import compute_stats

In [4]:
policy = asyncio.WindowsSelectorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)

In [5]:
def assist(assist_idx, assist_count):
    
    ar = StrictRedis(host='127.0.0.1', port=6379, db=8)
    data = {}
    task_snapshotting = None
    
    def get_daily_data(date):
        if date not in data:
            dd = DailyData(date, create=False)
            group_size = math.ceil(len(dd.symbols)/assist_count)
            scope = (
                assist_idx*group_size, 
                min((assist_idx+1)*group_size, len(dd.symbols))
            )
            data[date] = (dd, scope)
            
        return data[date]
    
        
    def compute_statistics(date):
#         dd = get_daily_data(date)
#         group_size = math.ceil(len(dd.symbols)/assist_count)
#         scope = (
#             assist_idx*group_size, 
#             min((assist_idx+1)*group_size, len(dd.symbols))
#         )
        dd, scope = get_daily_data(date)
        basics = dd.basics[scope[0]:scope[1], :]
        for _, check_point in enumerate(dd.check_points):
            
            snapshot = dd.snapshots[_,scope[0]:scope[1], :]
            statistic = dd.statistic[_,scope[0]:scope[1], :]
            
            time_lapse = dd.get_time_lapse(_)
            ma5pm_anchor_idx = dd.get_ma5pm_anchor_idx(_)
            fs5p = dd.snapshots[ma5pm_anchor_idx,scope[0]:scope[1], :]
            
            compute_stats(snapshot, basics, statistic, fs5p, time_lapse)
            
    
    async def snapshotting(date):
        try:
            dd, scope = get_daily_data(date)
#             dd = get_daily_data(date)
#             group_size = math.ceil(len(dd.symbols)/assist_count)
#             scope = (
#                 assist_idx*group_size, 
#                 min((assist_idx+1)*group_size, len(dd.symbols))
#             )

            q = Quotation(symbols=dd.symbols.tolist()[scope[0]:scope[1]])
            basics = dd.basics[scope[0]:scope[1], :]

            for _, check_point in enumerate(dd.check_points):
                if time.time() > check_point:
                    continue

                delay=(check_point-time.time())
                await asyncio.sleep(max(delay,0))
#                 await asyncio.sleep(5)

                try:
                    await q.snapshot(array=dd.snapshots[_,scope[0]:scope[1],:])

                    snapshot = dd.snapshots[_,scope[0]:scope[1], :]
                    statistic = dd.statistic[_,scope[0]:scope[1], :]

                    time_lapse = dd.get_time_lapse(_)
                    ma5pm_anchor_idx = dd.get_ma5pm_anchor_idx(_)
                    fs5p = dd.snapshots[ma5pm_anchor_idx,scope[0]:scope[1], :]

                    compute_stats(snapshot, basics, statistic, fs5p, time_lapse)

                    await ar.publish(f'hq_assist_{assist_idx}_snapshotting', json.dumps({"status":'successful',"idx":_,'check_point':int(check_point)}))
                        
                except Exception as e:
                    error = {
                        "status": 'failed',
                        "idx": _,
                        'check_point':int(check_point),
                        "exception": str(e),
                        'traceback': traceback.format_exc()
                    }
                    await ar.publish(f'hq_assist_{assist_idx}_snapshotting', json.dumps(error))
                    
#                 finally:
#                     if assist_idx == 0:
#                         dd.incremental_save(_)
                        
            if assist_idx == 0:
                dd.save()
                
        finally:
            await q.exit()
    
    
    async def main():
        
        snapshotting_task = None
        
        while True:
            key, value = await ar.brpop(f'hq_assist_{assist_idx}')
            msg = json.loads(value)
            print(f'Assist[{assist_idx}] {msg}')
            
            if msg['command'] == 'snapshotting':
                snapshotting_task = asyncio.create_task(snapshotting(msg['date']))
                
            elif msg['command'] == 'compute_statistics':
                try:
                    compute_statistics(msg['date'])
                    await ar.lpush(f'hq_assist_{assist_idx}_compute_statistics', json.dumps({"status":'success'}))
                except Exception as e:
                    error = {
                        "status": 'failed',
                        "exception": str(e),
                        'traceback': traceback.format_exc()
                    }
                    await ar.lpush(f'hq_assist_{assist_idx}_compute_statistics', json.dumps(error))
                    
            elif msg['command'] == 'incremental_save':
                dd = get_daily_data(msg['date'])
                dd.incremental_save(msg['idx'])

            elif msg['command'] == 'quit':
                    
                if snapshotting_task and snapshotting_task.done() is False:
                    print(f'Assist[{assist_idx}]: snapshotting_task is going to be canceled')
                    snapshotting_task.cancel()
                    
                print(f'Assist[{assist_idx}]: sharedmemory is going to be closed')
                for date in data:
                    data[date].close_sharedmemory()
                break
                
            else:
                pass
            
#     asyncio.run(main())
    asyncio.create_task(main())
    
    return data
    
# if __name__ == '__main__':
#     rd = redis.Redis(host='127.0.0.1', port=6379, db=8)
#     for key in rd.keys():
#         rd.delete(key)

#     ####
#     Utils.update_symbols()
    
#     symbols = Utils.get_running_symbols()
#     assist_count = math.ceil(len(symbols)/800)+1
    
#     rd.set('hq_assist_count', assist_count)

#     processes = []
#     for _ in range(assist_count):
#         proc = Process(target=assist, args=(_, assist_count))
#         processes.append(proc)
#         proc.start()

#     for proc in processes:
#         proc.join()


In [None]:
snapshot_handlers = []
async def start_snapshot_listening():
    date = time.strftime('%Y%m%d')
    
    rd = redis.Redis(host='127.0.0.1', port=6379, db=8)
    check_points_length = int(rd.get(f'hq_{date}_check_points_length'))
    assist_count = int(rd.get('hq_assist_count'))

    status = np.zeros(check_points_length, dtype=int)
    status

    def pre_handler(message):
        assist_idx = str(message['channel']).split('_')[2]
        data = json.loads(message['data'])
        check_point_idx = int(data['idx'])
        
        if data['status'] == 'successful':
            status[check_point_idx] += 1
        
            if status[check_point_idx] == assist_count:
                for handler in snapshot_handlers:
                    handler(data)
            print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ss_handler:', assist_idx, check_point_idx, status[check_point_idx])  
        else:
            print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ss_handler:', assist_idx, check_point_idx, message['data'])
            
    p = ar.pubsub()
    await p.psubscribe(**{'hq_assist_*_snapshotting': pre_handler})

    while status[-1] != assist_count:

        message = await p.get_message()

#         if np.sum(status) >= assist_count:
#             break

    await asyncio.sleep(5)
    await p.punsubscribe('hq_assist_*_snapshotting')
    p.close()

    
def add_snapshot_handler(handler):
    if handler in snapshot_handlers:
        return
    
    snapshot_handlers.append(handler)


def remove_snapshot_handler(handler):
    if handler not in snapshot_handlers:
        return

    idx = snapshot_handlers.index(handler)
    if idx != -1:
        snapshot_handlers.pop(idx)

In [None]:
# await asyncio.sleep(int(time.mktime(time.strptime(f'{time.strftime("%Y-%m-%d")} 09:13:00', '%Y-%m-%d %H:%M:%S'))) - time.time()) 

rd = redis.Redis(host='127.0.0.1', port=6379, db=8)

symbol_count = len(Utils.get_symbols()) + 10
assist_count = math.ceil(symbol_count/800)+1

rd.set('hq_assist_count', assist_count)

result = []
for _ in range(assist_count):
    data = assist(_, assist_count)
    result.append(data)

In [21]:
dd.close_sharedmemory()

In [None]:
rd = redis.Redis(host='127.0.0.1', port=6379, db=8)

In [None]:

# for key in rd.keys():
#     rd.delete(key)

In [None]:
Utils.update_symbols()

In [None]:
symbols = Utils.get_running_symbols()
assist_count = math.ceil(len(symbols)/800)+1
assist_count

In [None]:
rd.set('hq_assist_count', assist_count)

In [None]:
result = []
for _ in range(assist_count):
    data = assist(_, assist_count)
    result.append(data)

In [None]:
date = time.strftime('%Y%m%d')
result[0][date].get_snapshot('13:30:55')

In [8]:
date = time.strftime('%Y%m%d')
date

'20210625'

In [None]:
dd = DailyData(date)

In [9]:
securities = dd.get_securities()

In [10]:
securities.loc['300057']

name             万顺新材
zt_price         7.14
dt_price         4.76
ma5vpm     121,432.31
mcap            29.70
sum4            23.73
sum9            49.43
sum19           99.32
sum29          146.16
sum59          286.26
Name: 300057, dtype: object

In [14]:
snapshot = dd.get_snapshot('11:15:55')
snapshot

Unnamed: 0_level_0,datetime,timestamp,name,open,close,now,high,low,turnover,volume,bid1,bid1_volume,zhangfu,junjia,liangbi,zhangsu,tingban,ma5
symbol,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
000001,11:15:55,1624590955,平安银行,23.08,23.08,23.39,23.50,23.06,27365401.00,639150296.35,23.39,57700.00,1.34,23.36,1.03,1.34,,22.95
000002,11:15:55,1624590955,万 科Ａ,24.40,24.33,24.41,24.49,24.30,23422726.00,571367868.45,24.40,202200.00,0.33,24.39,0.78,0.04,,24.33
000004,11:15:55,1624590955,国华网安,18.86,18.86,18.08,19.10,17.99,4893017.00,90035897.94,18.08,1600.00,-4.14,18.40,1.25,-4.14,,19.54
000005,11:15:55,1624590955,ST星源,1.92,1.92,1.91,1.96,1.90,9797933.00,18872767.62,1.90,328500.00,-0.52,1.93,2.14,-0.52,,1.88
000006,11:15:55,1624590955,深振业Ａ,5.04,5.03,5.01,5.04,5.00,2878900.00,14430925.00,5.00,550400.00,-0.40,5.01,1.04,-0.60,,5.11
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
688777,11:15:55,1624590955,中控技术,88.60,88.60,90.16,91.13,88.00,508005.00,45744235.00,90.00,600.00,1.76,90.05,1.26,1.76,,90.71
688788,11:15:55,1624590955,科思科技,130.00,130.83,131.00,132.97,130.00,224116.00,29506033.00,130.81,400.00,0.13,131.66,0.74,0.77,,133.48
688819,11:15:55,1624590955,天能股份,42.05,42.25,41.20,42.30,41.18,2272136.00,94401895.00,41.21,333.00,-2.49,41.55,1.73,-2.02,,41.89
688981,11:15:55,1624590955,中芯国际,57.76,57.70,59.05,59.99,57.76,31308416.00,1853804015.00,59.04,8291.00,2.34,59.21,1.75,2.23,,58.01


In [None]:
############################## TEST #####################################

In [None]:
check_points = Utils.get_check_points()
symbols = Utils.get_running_symbols()

In [None]:
date = time.strftime('%Y%m%d')

In [None]:
data = result[0][date]

In [None]:
q = Quotation(symbols)
snapshot = await q.snapshot()

market_values = await q.get_market_values()

for _, symbol in enumerate(symbols):
    data.basics[_, 0] = market_values[symbol]['zt_price']
    data.basics[_, 1] = market_values[symbol]['dt_price']
    data.basics[_, 3] = market_values[symbol]['mcap']

await q.exit()

In [None]:
securities = data.get_securities()
securities

In [None]:
securities.loc['300057']

In [None]:
ss = data.get_snapshot('13:07:45')
ss

In [None]:
ss.loc['600354']

In [None]:
st = time.time()
for _ in range(len(data.check_points)):
    time_lapse = data.get_time_lapse(_)
    ma5pm_anchor_idx = data.get_ma5pm_anchor_idx(_)
    fs5p = data.snapshots[ma5pm_anchor_idx]
    compute_stats(data.snapshots[_], data.basics, data.statistic[_], fs5p, time_lapse)
et = time.time()
et - st

In [None]:
data.save()

In [1]:
results = [ None for _ in range(7) ]
for _ in range(7):
    results[_] = {"status":'successful',"idx": 10}
results

[{'status': 'successful', 'idx': 10},
 {'status': 'successful', 'idx': 10},
 {'status': 'successful', 'idx': 10},
 {'status': 'successful', 'idx': 10},
 {'status': 'successful', 'idx': 10},
 {'status': 'successful', 'idx': 10},
 {'status': 'successful', 'idx': 10}]

True