In [None]:
## 从导出的表格中获取同花顺最近半年涨停超过N次的非ST股票数据
## 条件：非st股票 半年内涨停超过6次
import xlrd
data = xlrd.open_workbook('/data/stock_new/half_year_good_stocks.xlsx')
table = data.sheets()[0]
code_list = table.col_values(0)[1::]
codes = [code.split('.')[0] for code in code_list]
print(codes)

In [None]:
## 获取东方财富5分钟成交量,该9.35的数据未计算开盘竞价的价格变动与成交量
# 5分钟价格：[open, close, high, low]
import requests
import json 
from pandas import DataFrame
import pandas as pd
import numpy as np
import time
import threading
import sys 
sys.path.append('/Users/flee/myspace/utils') 
import fmail

DAY_NUM = 5
DAILY_DIVISION_COUNT = 48
LAST_INDEX = 1439
LOW_PRICE_COLUMN_INDEX = 2
VOL_COLUMN_INDEX = 3


def isSuitableTime(lastDateStr):
    lastTime = time.strptime(lastDateStr, "%Y-%m-%d %H:%M")
    lastTimeStamp = int(time.mktime(lastTime))
    currentTimeStamp = int(time.time())
    accept_diff = 900 #最近十五分钟
    return currentTimeStamp - lastTimeStamp < accept_diff

def getSuffixCode(code):
    if code[0] == '6':
        return code + '1'
    else: 
        return code + '2'
    
def getMean(vols):
    return np.mean(np.array(vols))

def getMin(closes):
    return np.min(np.array(closes))

def isLowPriceDoubleVol(low, vol, min_close, mean_vol):
    if low < min_close and vol > 3 * mean_vol:
        return True
    else:
        return False
    
# ## 最低价触及最近5天5分钟收盘价新低，且成交倍量
# ## 判断最近两个5分钟的主要防御东财数据更新不及时
def isTargetStock(code_id):
    easy_url = 'http://pdfm.eastmoney.com/EM_UBG_PDTI_Fast/api/js?rtntype=5&token=4f1862fc3b5e77c150a2b985b12db0fd&type=m5k&authorityType=fa&id='
    final_url = easy_url + getSuffixCode(code_id)
    response = requests.get(final_url)
    data_str = response.text.split('(')[-1].split(')')[0]
    data = json.loads(data_str)['data']
    if not isSuitableTime(data[-1].split(',')[0]):
        return False    
    time_list = []
    close_list = []
    low_list = []
    vol_list = []
    for stock_tick in data:
        tick_array = stock_tick.split(',')
        time_list.append(tick_array[0])
        close_list.append(float(tick_array[2]))
        low_list.append(float(tick_array[4]))
        vol_list.append(int(tick_array[6])/1000)
    mean_vol = getMean(vol_list)
    closes = close_list[-DAY_NUM*DAILY_DIVISION_COUNT:]
    min_close = getMin(closes)
    return isLowPriceDoubleVol(low_list[-1], vol_list[-1], min_close, mean_vol) or isLowPriceDoubleVol(low_list[-2], vol_list[-2], min_close, mean_vol)  
    return False

def fetchResult(codes):
    result_codes = []
    failed_codes = []
    start = time.time()
    for code in codes:
        try:
            if isTargetStock(code):
                result_codes.append(code)
        except BaseException as e:
            failed_codes.append(code)
            pass
        continue
    elapsed = time.time() - start
    print('计算' + str(len(codes)) + '个股票, 共花费' + str(elapsed) + '秒')
    if failed_codes:
        print('失败股票codes=')
        print(failed_codes)
    return result_codes


lock = threading.Lock()
result = []
class MultiThreadingCal(threading.Thread):
    def __init__(self, codes):
        super(MultiThreadingCal, self).__init__()
        self.codes = codes
    def run(self):
        global result
        tmpResult = fetchResult(self.codes)
        with lock:
            result = result + tmpResult 
            
def listSplit(listTemp, n):
    for i in range(0, n):
        yield listTemp[i::n]
        
def cal_main():
    start = time.time()
    localtime = time.localtime(start)
    strTime = time.strftime("%Y-%m-%d %H:%M:%S", localtime)
    print('开始时间为 ' + strTime)
    threads = []
    # 创建新子线程
    codes_array = listSplit(codes, 5)
    for single_codes in codes_array:
        subThread = MultiThreadingCal(single_codes)
        subThread.start()
        threads.append(subThread)
    # 等待所有线程完成
    for t in threads:
        t.join()
    global result
    print(result)
    
    ##发邮件 用守护线程包裹，作为邮件发送超时的一种防御(但会导致守护线程一直结束不了)
    if result:
        topic = '【股票】低吸买点'
        text = '以下股票出现低吸买点\n' + str(result)
        t = threading.Thread(target=fmail.mail, args = (topic, text))
        t.setDaemon(True)
        t.start()
        t.join(2)
    with lock:
        result = []
    end = time.time()
    elapsed = end - start
    localtime = time.localtime(end)
    strTime = time.strftime("%Y-%m-%d %H:%M:%S", localtime)
    print('结束时间为  ' + strTime)
    print('计算' + str(len(codes)) + '个股票, 共花费' + str(elapsed) + '秒')
    print('Exiting cal_main Thread')
print('done')

In [None]:
import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler

def timedTask():
    print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    
# 创建后台执行的 schedulers
scheduler = BackgroundScheduler()  
# 添加调度任务
#     scheduler.add_job(cal_main, 'cron', day_of_week = '0-4', hour = '9-11, 13-15' minute = '*/5')
scheduler.add_job(cal_main, 'cron', minute = '*/5')
# 启动调度任务
scheduler.start()
print('start')