In [1]:
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor

# 设置输入和输出文件夹
input_folder = r"D:\data_origin"
output_folder = r"D:\data_v2"
os.makedirs(output_folder, exist_ok=True)

# 获取所有 CSV 文件
csv_files = [f for f in os.listdir(input_folder) if f.endswith(".csv")]


# 处理单个 CSV 文件的函数
def process_stock_data(file):
    file_path = os.path.join(input_folder, file)
    output_path = os.path.join(output_folder, file)

    try:
        # 读取 CSV 文件
        df = pd.read_csv(file_path, encoding='GBK',skiprows=1)

        # 确保包含所需列
        required_columns = ["股票代码", "股票名称", "交易日期", "开盘价", "最高价", "最低价", "收盘价", "前收盘价",
                            "成交量", "成交额", "流通市值", "总市值"]
        
        final_columns = ['code',
                         'date',
                         'return1',
                         'open',
                         'close',
                         'high',
                         'low',
                         'volume',
                         'VWAP',
                         'turn',
                         'free_turn',
                         'close/free_turn',
                         'open/turn',
                         'volume/low',
                         'VWAP/high',
                         'low/high',
                         'VWAP/close']
        
        if not all(col in df.columns for col in required_columns):
            print(f"文件 {file} 缺少必要列，跳过处理")
            return
        
        # 筛选时间范围
        df["交易日期"] = pd.to_datetime(df["交易日期"], errors="coerce")
        df = df[(df["交易日期"] >= "2011-01-31") & (df["交易日期"] <= "2024-12-31")]
        if len(df)<= 40:
            #print(f"文件 {file} 数据不足40天，跳过")
            return
        
         # 检查该股票是否在任何时间为 ST
        if df["股票名称"].str.contains("ST", na=False).any():
            #print(f"文件 {file} 包含 ST 股票，跳过存储")
            return

        # 检查该股票是否在任何时间为 PT
        if df["股票名称"].str.contains("PT", na=False).any():
            #print(f"文件 {file} 包含 PT 股票，跳过存储")
            return
        
        df = df[required_columns]
        df['code'] = df['股票代码']
        df['date'] = df['交易日期']
        df['open'] = df['开盘价']
        df['close'] = df['收盘价']
        df['high'] = df['最高价']
        df['low'] = df['最低价']
        df['volume'] = df['成交量']


        # 计算日收益率
        df["return1"] = (df["收盘价"] - df["前收盘价"]) / df["前收盘价"]

        # 计算 VWAP
        df["VWAP"] = df["成交额"] / df["成交量"]

        # 计算换手率（假设总股本 = 总市值 / 收盘价，自由流通股 = 流通市值 / 收盘价）
        df["turn"] = df["成交量"] / (df["总市值"] / df["收盘价"])
        df["free_turn"] = df["成交量"] / (df["流通市值"] / df["收盘价"])

        #计算AlphaNet_v2中需要的比率数据
        df['close/free_turn'] = df['close'] / df['free_turn']
        df['open/turn'] = df['open'] / df['turn']
        df['volume/low'] = df['volume'] / df['low']
        df['VWAP/high'] = df['VWAP'] / df['high']
        df['low/high'] = df['low'] / df['high']
        df['VWAP/close'] = df['VWAP'] / df['close']

        # 处理异常值
        df.replace([float("inf"), -float("inf")], None, inplace=True)
        # df.dropna(inplace=True)
        
        df = df[final_columns]
        # 保存处理后的数据
        df.to_csv(output_path, index=False, encoding="GBK")
        #print(f"{file} 处理完成")

    except Exception as e:
        print(f"处理 {file} 时出错: {e}")


# 使用多线程并行处理多个 CSV 文件
num_threads = min(8, len(csv_files))  # 线程数取 8 或 CSV 文件数量的最小值
with ThreadPoolExecutor(max_workers=num_threads) as executor:
    executor.map(process_stock_data, csv_files)

print("所有数据处理完成，已保存至:", output_folder)

所有数据处理完成，已保存至: D:\data_v2
