In [1]:
from types import SimpleNamespace 
from datetime import datetime, timedelta
import akshare as ak
import pandas as pd
import multiprocessing
# import threading
import threading
import queue
import psutil
import time
import talib
from concurrent.futures  import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import numpy as np
import logging
import sys
import traceback


In [2]:
# 创建Logger
logger = logging.getLogger() 
logger.setLevel(logging.DEBUG) 

# 创建一个处理器用于写入文件（只记录ERROR及以上级别的日志到单独文件）
error_handler = logging.FileHandler('error.log') 
error_handler.setLevel(logging.ERROR) 

# 创建一个处理器用于写入所有日志的文件
all_handler = logging.FileHandler('all.log') 
all_handler.setLevel(logging.DEBUG) 

# 创建一个控制台处理器
console_handler = logging.StreamHandler(sys.stdout) 
console_handler.setLevel(logging.INFO) 

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
error_handler.setFormatter(formatter) 
all_handler.setFormatter(formatter) 
console_handler.setFormatter(formatter) 

# 添加到Logger
logger.addHandler(error_handler) 
logger.addHandler(all_handler) 
logger.addHandler(console_handler) 

In [3]:
args = SimpleNamespace(
    datedelta = 50,
    start_epochs = 8
    )
today = datetime.now()
start_day = today - timedelta(days=args.datedelta)
test_day = today - timedelta(days=1)

# 格式化日期为YYYYMMDD格式
formatted_today = today.strftime('%Y%m%d')
formatted_test_day = test_day.strftime('%Y%m%d')
formatted_start_day = start_day.strftime('%Y%m%d')

with open('today_suggestions.txt', 'w', encoding='utf-8') as today_suggestions,open('history_suggestions.txt', 'a', encoding='utf-8') as history_suggestions:
    today_suggestions.write(formatted_today + "!!!!!!!!~~~~~~~~~~~~~~\n")
    history_suggestions.write(formatted_today + "!!!!!!!!!!!~~~~~~~~~~~~\n")
# df = pd.read_csv('data.csv',  dtype={0: str})
df = pd.read_csv('mainboard_stocks.csv', dtype={'code': str})

In [4]:
sd = ak.stock_zh_a_hist(symbol="000001", period="daily", start_date=formatted_start_day, end_date=formatted_today, adjust="")
print(sd)

KeyboardInterrupt: 

In [None]:
stock_codes = list(set(df['code']))
print(len(df) == len(stock_codes))
df.head()

True


Unnamed: 0,code,name
0,1,平安银行
1,2,万 科Ａ
2,4,*ST国华
3,6,深振业Ａ
4,7,全新好


In [None]:
total_len = len(df)
physical_cpus = psutil.cpu_count(logical=False)
logical_cpus = psutil.cpu_count(logical=True)

log_phy_ratio = int(logical_cpus/physical_cpus)
# phy_cpu_length = int(total_len/physical_cpus)


In [None]:
print(physical_cpus,logical_cpus)

16 32


In [None]:
def stock_data_getter(stock_codes, formatted_start_day, formatted_today):
    res_ls = []
    for stock_code in stock_codes:
        time.sleep(3) #avoid abandon from remote
        try:
            stock_zh_a_hist_df = ak.stock_zh_a_hist(symbol=stock_code, period="daily", start_date=formatted_start_day, end_date=formatted_today, adjust="")
            if stock_zh_a_hist_df.empty:
                print("wrong code:",stock_code)
            else:
                res_ls.append((stock_code, stock_zh_a_hist_df))
        except Exception as e:
            logging.error(f"Error  in stock_data_getter for {stock_code}: {e}")
            logging.error(traceback.format_exc())
            return
    print("stock code:",stock_codes[-1],"data collection finished:", time.time())
    return res_ls

In [None]:
def run_analysis(stock_codes_ls, process_lock):
    res_ls = []
    for stock_code, stock_zh_df in stock_codes_ls:
        try:
            open = stock_zh_df["开盘"]
            close = stock_zh_df["收盘"]
            high = stock_zh_df["最高"]
            low = stock_zh_df["最低"]
            volume = stock_zh_df["成交量"]
            turnover = stock_zh_df["换手率"]

            ma5 = talib.SMA(close[-5:], timeperiod=5)
            ma10 = talib.SMA(close[-10:], timeperiod=10)
            vol_ma5 = talib.SMA(volume[-5:], timeperiod=5)

            macd, macdsignal, macdhist = talib.MACDEXT(close, fastperiod=12, fastmatype=0, slowperiod=20, slowmatype=0, signalperiod=9, signalmatype=0)

            rsi = talib.RSI(close, timeperiod=14)  # RSI相对强弱指标
            cci = talib.CCI(high, low, close, timeperiod=20)  # 顺势指标
            # money_flow = (2*close - low - high) / (high - low) * volume  # 简易资金流
            money_flow = talib.MFI(high, low, close, volume, timeperiod=9)
            money_flow_max= money_flow.shift(1).rolling(4).max()
            turnover_mean= money_flow.shift(1).rolling(5).mean()
            today_candlestick = abs(close.iloc[-1] - open.iloc[-1])
            yesterday_candlestick = abs(close.iloc[-2] - open.iloc[-2])

            if (close.iloc[-1] > ma5.iloc[-1] and ma5.iloc[1] > ma10.iloc[-1]) and \
                    volume.iloc[-1] > vol_ma5.iloc[-1] * 1.8 and turnover.iloc[-1] > turnover_mean * 1.5 and \
                    (macd.iloc[-1] > macdsignal.iloc[-1] and macd.iloc[-2] < macdsignal.iloc[-2] and macdhist.iloc[-1] > abs(macdhist.iloc[-2])):
                res_ls.append(f"""stock_code: {stock_code} result: 趋势启动+量价齐升型 \n """)
            elif (rsi.iloc[-1] <30) and (cci.iloc[-1] < -150) and \
                today_candlestick > 0.5 * yesterday_candlestick and turnover.iloc[-1] > turnover_mean * 1.2 and \
                money_flow.iloc[-1] > money_flow_max:
                res_ls.append(f"""stock_code: {stock_code} result: 超跌反转+资金异动型 \n """)

        except Exception as e:
            logging.error(f"Error  in run_analysis for {stock_code}: {e}")
            logging.error(traceback.format_exc())
            return

        with process_lock:
            with open('today_suggestions.txt', 'a', encoding='utf-8') as today_suggestions,open('history_suggestions.txt', 'a', encoding='utf-8') as history_suggestions:
                for result in res_ls:
                    today_suggestions.write(result)
                    history_suggestions.write(result)
        return  

In [None]:
def check_parameter(stock_codes_ls):
    for stock_code, stock_zh_df in stock_codes_ls:
        try:
            open = stock_zh_df["开盘"]
            close = stock_zh_df["收盘"]
            high = stock_zh_df["最高"]
            low = stock_zh_df["最低"]
            volume = stock_zh_df["成交量"]
            turnover = stock_zh_df["换手率"]

            ma5 = talib.SMA(close[-5:], timeperiod=5)
            ma10 = talib.SMA(close[-10:], timeperiod=10)
            vol_ma5 = talib.SMA(volume[-5:], timeperiod=5)

            macd, macdsignal, macdhist = talib.MACDEXT(close, fastperiod=12, fastmatype=0, slowperiod=20, slowmatype=0, signalperiod=9, signalmatype=0)

            rsi = talib.RSI(close, timeperiod=14)  # RSI相对强弱指标
            cci = talib.CCI(high, low, close, timeperiod=20)  # 顺势指标
            # money_flow = (2*close - low - high) / (high - low) * volume  # 简易资金流
            money_flow = talib.MFI(high, low, close, volume, timeperiod=9)
            money_flow_max= money_flow.shift(3).rolling(4).max()
            turnover_mean= money_flow.shift(3).rolling(5).mean()
            today_candlestick = (close.iloc[-3] - open.iloc[-3])
            yesterday_candlestick = (close.iloc[-4] - open.iloc[-4])

            if (close.iloc[-3] > ma5.iloc[-3] and ma5.iloc[3] > ma10.iloc[-3]) and \
                    volume.iloc[-3] > vol_ma5.iloc[-3] * 3.8 and turnover.iloc[-3] > turnover_mean * 1.5 and \
                    (macd.iloc[-3] > macdsignal.iloc[-3] and macd.iloc[-4] < macdsignal.iloc[-4] and macdhist.iloc[-3] > abs(macdhist.iloc[-4])):
                if close.iloc[-1] > close.iloc[-2]:
                    print(f"""stock_code: {stock_code} result: 趋势启动+量价齐升型 \n """)
            elif (rsi.iloc[-3] <30) and (cci.iloc[-3] < -150) and \
                today_candlestick > 0.5 * yesterday_candlestick and turnover.iloc[-3] > turnover_mean * 1.2 and \
                money_flow.iloc[-3] > money_flow_max:
                if close.iloc[-1] > close.iloc[-2]:
                    print(f"""stock_code: {stock_code} result: 超跌反转+资金异动型 \n """)
        except Exception as e:
            logging.error(f"Error  in check_parameter for {stock_code}: {e}")
            logging.error(traceback.format_exc())


In [None]:
aspls = np.array_split(stock_codes, physical_cpus)
# 然后每个子数组是numpy数组，可以转成列表
chunked_list = [arr.tolist() for arr in aspls]

for i in chunked_list:
    print(i[-1])

process_lock = multiprocessing.Lock()
stock_data = []

with ThreadPoolExecutor(max_workers=physical_cpus) as executor:
    futures = [executor.submit(stock_data_getter, stock_codes_ls, formatted_start_day, formatted_today)
        for stock_codes_ls in chunked_list]
    
    for future in as_completed(futures):
        try:
            result = future.result() 
            if result is not None:  # 可选过滤空结果
                stock_data.append(result) 
        except Exception as e:
            print("Error in got results thread:", e)

603558
600518
002968
000813
002200
002456
600272
002400
600734
000581
600082
600105
600774
603103
002661
000007
Error in got results thread: HTTPSConnectionPool(host='push2his.eastmoney.com', port=443): Max retries exceeded with url: /api/qt/stock/kline/get?fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61%2Cf116&ut=7eea3edcaed734bea9cbfc24409ed989&klt=101&fqt=0&secid=1.600995&beg=20250718&end=20250906 (Caused by ProxyError('Unable to connect to proxy', RemoteDisconnected('Remote end closed connection without response')))
Error in got results thread: HTTPSConnectionPool(host='push2his.eastmoney.com', port=443): Max retries exceeded with url: /api/qt/stock/kline/get?fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61%2Cf116&ut=7eea3edcaed734bea9cbfc24409ed989&klt=101&fqt=0&secid=0.002501&beg=20250718&end=20250906 (Caused by ProxyError('Unable to connect to proxy', RemoteDisc

In [None]:
print(stock_data.head(2))

AttributeError: 'list' object has no attribute 'head'

In [None]:
with ProcessPoolExecutor(max_workers=physical_cpus) as executor:
    futures = [executor.submit(check_parameter, sd_ls)
        for sd_ls in stock_data]
    for future in as_completed(futures):
        try:
            future.result() 
        except Exception as e:
            print("Error in process:", e)

Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was running or pending.
Error in process: A process in the process pool was terminated abruptly while the future was run

In [None]:
# with ProcessPoolExecutor(max_workers=physical_cpus) as executor:
#     futures = [executor.submit(run_analysis, args=(stock_codes_ls, process_lock))
#         for stock_codes_ls in chunked_list]
#     for future in as_completed(futures):
#         try:
#             future.result() 
#         except Exception as e:
#             print("Error in thread:", e)


In [None]:
# collect_and_analyze_data(["000001","000002"], log_phy_ratio, formatted_start_day, formatted_today, lock, run_analysis)

In [None]:
# df_shanghai = ak.index_zh_a_hist( 
#     symbol="000001",      # 上证指数代码（固定为000001）
#     period="daily",       # 数据周期：daily（日线）、weekly（周线）、monthly（月线）
#     start_date="20200101", # 起始日期（格式：YYYYMMDD）
#     end_date="20250904",   # 结束日期（默认为当前日期）
# )

# # 查看前5行数据 
# print(df_shanghai.head()) 

ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

In [None]:
# def get_mainboard_stocks_ak():
#     """使用akshare获取主板股票列表"""
#     # 获取所有上市公司的基本信息
#     stock_info = ak.stock_info_a_code_name()
    
#     # 筛选主板股票
#     mainboard_stocks = stock_info[stock_info['code'].str.startswith(('600', '601', '603', '605', '000', '002'))]
    
#     return mainboard_stocks

# # 获取主板股票
# mainboard_stocks_ak = get_mainboard_stocks_ak()
# print(mainboard_stocks_ak.head())
# mainboard_stocks_ak.to_csv('mainboard_stocks.csv', index=False, encoding='utf-8-sig')

In [None]:
# # date="20200331"; choice of {"XXXX0331", "XXXX0630", "XXXX0930", "XXXX1231"}; 从 20081231 开始
# stock_yjyg_em_df = ak.stock_yjyg_em(date="20250630")
# stock_yjyg_em_df_sorted_desc = stock_yjyg_em_df.sort_values(by=stock_yjyg_em_df.columns[6], ascending=False) #'业绩变动幅度'
# # print(stock_yjyg_em_df_sorted_desc.head(10))
# print(stock_yjyg_em_df_sorted_desc.iloc[:, [1,6]].head(10))