In [None]:
from libs.array import D2Array, D3Array
from libs.sina import Sina

from timeit import default_timer as timer

from multiprocessing import Process, Pipe
import numpy as np
import traceback

In [None]:

class HQ(Process):
    
    def __init__(self, pipe, buffers, scope=None, sub_group_count=1):
        super(HQ, self).__init__()

        self.allowed_funcs = ['prepare', 'check']
        
        parent, child = pipe
        # child.close()
        self.parent = parent

        self.buffers = buffers
        self.scope = scope
        self.sub_group_count = sub_group_count
        
        print('hq process ['+str(self.pid)+'] was started with scope:', scope, 'and sub_group_count:', sub_group_count)

    def run(self):
        while True:
            try:
                msg = self.parent.recv()
                
                start = timer()
                
                result = {'pid': os.getpid()}

                if msg['type'] == 'action' and msg['action'] == 'quit':
                    result['type'] = 'notification'
                    result['content'] = 'this process is going to quit!'
                    break
                    
                elif msg['type'] == 'function':
                    
                    func = msg['function']

                    if hasattr(self, func) and func in self.allowed_funcs:
                        args = msg['args']
                        kwargs = msg['kwargs']
                        result['content'] = getattr(self, func)(*args, **kwargs)
                        result['type'] = 'return'

                    elif func in self.allowed_funcs:
                        result['type'] = 'exception'
                        result['content'] = 'Call to func['+func+'] is not allowed!'
                        result['traceback'] = ''
                        
                    else:
                        result['type'] = 'exception'
                        result['content'] = 'func['+func+'] does not exist!'
                        result['traceback'] = ''
                    
                else:
                    result['type'] = 'exception'
                    result['content'] = 'stupid input message'
                    result['traceback'] = ''
                        
            except EOFError as e:
                result['type'] = 'notification'
                result['content'] = 'pipe connection was closed and this process is going to quit!'
                break
            except Exception as e:
                result['type'] = 'exception'
                result['content'] = str(e)
                result['traceback'] = traceback.format_exc()
            finally:
                result['used_time'] = timer()-start
                self.parent.send(result)
    
    def check(self):
        now = datetime.now()
        idx = self.msl.get_index(now)
        view = self.msl.data[idx, self.scope[0]:self.scope[1], :]
        self.sina.market_snapshot(array=view)

        return {'index':idx}

    def prepare(self):
        buffers = self.buffers
        scope = self.scope
        sub_group_count = self.sub_group_count

        self.codes = np.frombuffer(buffers['codes'], dtype='<U6')
        self.names = np.frombuffer(buffers['names'], dtype='<U4')
        
        msl_interval = 5
        msl_check_points = Utils.get_check_points(msl_interval)
        msl_cols = ['open','close','now','high','low','turnover','volume']
        self.msl = D3Array(msl_check_points, self.codes, msl_cols, msl_interval, buffer=buffers['msl'])   # market snapshots
        
        fsl_interval = 60
        fsl_check_points = Utils.get_check_points(fsl_interval)
        fsl_cols = ['price','volume']
        self.fsl = D3Array(fsl_check_points, self.codes, fsl_cols, fsl_interval, buffer=buffers['fsl'])      # 分时 图
        
        stat_cols = ['zhangfu', 'junjia', 'liangbi', 'zhangsu', 'fszb', 'fsto']
        self.stat = D3Array(fsl_check_points, self.codes, stat_cols, fsl_interval, buffer=buffers['stat'])      # 分时 统计
        
        basics_cols = ['zt_price', 'ma5vpm', 'ltgb']
        self.basics = D2Array(self.codes, basics_cols, buffer=buffers['basics'])

        if scope is None:
            self.sina = Sina(self.codes, group_count=sub_group_count)
        else:
            self.sina = Sina(self.codes[scope[0]:scope[1]], group_count=sub_group_count)

    def incremental_save(self, dt):
        dt = datetime.strptime(dt, '%Y-%m-%d %H:%M:%S') if type(dt) == str else dt
        pass 

    def close_hdf5(self):
        pass

In [None]:
start = timer()

end = timer()
(end-start)*1000

In [None]:
traceback.format_exc()