## 成交量信号探索

In [1]:
import numpy as np
import pandas as pd
import os
import time
import math
import multiprocessing
from ginkgo.data.ginkgo_mongo import ginkgo_mongo as gm

In [2]:
def check(
    stock_queue,
    signal_queue,
    bull_queue,
    bear_queue,
    signal_period:int,
    observer_period:int,
    stock:str,
    signal_rate:float
    ):
    df = gm.get_dayBar_by_mongo(code=stock)
    count = df.shape[0]
    if count < (signal_period + observer_period +2):
        stock_queue.put(stock)
    for i in range(count - signal_period - observer_period -2):
        if df.iloc[i].volume == '':
            continue
        signal_0 = int(df.iloc[i].volume)
        if signal_0 == 0:
            continue
        is_ok = True
        for j in range(1,signal_period):
            try:
                signal_j = int(df.iloc[i+j].volume)
                signal_p = int(df.iloc[i+j-1].volume)
            except:
                break
            if signal_j < signal_p:
                is_ok = False
                break
        if not is_ok:
            continue
        try:
            signal_1 = int(df.iloc[i+signal_period].volume)
        except:
            signal_1 =0
        
        if signal_1/signal_0 < signal_rate+1:
            continue

        signal_queue.put(1)
        observe_0 = float(df.iloc[i+signal_period+1].close)
        observe_1 = float(df.iloc[i+signal_period+observer_period+1].close) 
        observe_rate = (observe_1-observe_0) / observe_0
        
        if observe_rate >0:
            bull_queue.put(observe_rate)
        else:
            bear_queue.put(observe_rate)
    stock_queue.put(stock)
    

In [4]:
def p_update(*a):
    msg = f"进度: {round(check_done_stock_queue.qsize()/stock_num*100,2)}% "
    msg += f"信号: {signal_queue.qsize()}"
    if signal_queue.qsize()>0:
        msg += f"上涨: {bull_queue.qsize()}[{round(bull_queue.qsize()/signal_queue.qsize()*100,2)}%]"
        msg += f"下跌: {bear_queue.qsize()}[{round(bear_queue.qsize()/signal_queue.qsize()*100,2)}%]"
    print(msg)

if __name__ == "__main__":
    print(f"Main Process {os.getpid()}..")

    start = time.time()
    cpu_core_num = multiprocessing.cpu_count()
    thread_num = cpu_core_num - 2
    # thread_num = 1
    stock_list = gm.get_all_stockcode_by_mongo()
    stock_num = stock_list.shape[0] 
    epoch_num = math.floor(stock_num/thread_num)

    print(f"Stock:{stock_list.shape[0]}")
    print(f"建立了一个 {thread_num} 容量的进程池")

    p = multiprocessing.Pool(thread_num)
    check_done_stock_queue = multiprocessing.Manager().Queue()
    signal_queue = multiprocessing.Manager().Queue()
    bull_queue = multiprocessing.Manager().Queue()
    bear_queue = multiprocessing.Manager().Queue()
    signal_period = 4
    observe_period = 4
    rate = .5


    for i,r in stock_list.iterrows():
        data = r["code"]
        p.apply_async(
            check,
            args=(
                check_done_stock_queue,
                signal_queue,
                bull_queue,
                bear_queue,
                signal_period,
                observe_period,
                data,
                rate
                ),
            callback=p_update,
        )

    print("Waiting for all subprocesses done...")
    p.close()
    p.join()
    end = time.time()
    print("All Daybar subprocesses done. Tasks runs %0.2f seconds." % (end - start))

Main Process 20320..
Stock:4946
建立了一个 14 容量的进程池
Waiting for all subprocesses done...
