### 获取所有股票代码

In [1]:
import os
import json

# Define the updated function to modify the filenames as required
def get_ticker_with_prefix(path):
    tickers = []
    for f in os.listdir(path):
        if f.endswith('.csv'):
            # Extract numeric part
            numeric_part = ''.join(filter(str.isdigit, f))
            # Extract the letter part (.SZ or .SH), convert to lowercase and prepend to the numeric part
            letter_part = f.split('.')[1].lower()
            #ticker = letter_part + numeric_part
            ticker = numeric_part
            tickers.append(ticker)
    # Sort the list of IDs
    return sorted(tickers)

# Replace 'path_to_folder' with the actual path to your folder containing the CSV files
path_to_folder = 'data/raw-data/2024'

# Get the modified stock ids
tickers_with_prefix = get_ticker_with_prefix(path_to_folder)

# Convert the list of modified stock ids to JSON format
json_content = json.dumps(tickers_with_prefix, indent=4)

# Replace 'path_to_json_file' with the actual path where you want to save the JSON file
path_to_json_file = 'data/tickers.json'

# Write the JSON content to a file
with open(path_to_json_file, 'w') as json_file:
    json_file.write(json_content)

### 获取后复权因子

In [None]:
### 从 akshare 获取后复权因子数据，并保存在 CSV 文件中

import akshare as ak
import json
import os

# 读取股票代码
path_to_json_file = 'data/ticker.json'
with open(path_to_json_file, 'r') as json_file:
    tickers = json.load(json_file)

# 检查并获取后复权因子
for ticker in tickers:
    filename = f"{ticker}.csv"

    # 检查文件是否存在
    if not os.path.exists("data/backward_adjust_factor/" + filename):
        # 如果ticker开头是0或3，增加sz前缀，否则增加sh前缀
        if ticker.startswith('0') or ticker.startswith('3'):
            ticker = 'sz' + ticker
        else:
            ticker = 'sh' + ticker
            
        try:
            # 尝试调用 API 获取数据
            df = ak.stock_zh_a_daily(symbol=ticker, adjust="hfq-factor")
            # 保存数据到 CSV 文件
            df.to_csv("data/backward_adjust_factor/" + filename, index=False)
            print(f"{ticker} data saved to {filename}")
        except Exception as e:
            # 打印错误信息，并继续处理下一个 ticker
            print(f"Error retrieving data for {ticker}: {e}")
    else:
        print(f"File {filename} already exists. Skipping.")

### 按股票合并历年数据<多线程>

In [1]:
import os
import pandas as pd
import json
import concurrent.futures
from tqdm import tqdm

# 路径设置
json_file = 'data/tickers.json'
data_dir = 'data/raw-data/'
merged_dir = 'data/by_stock_merged/'
if not os.path.exists(merged_dir):
    os.makedirs(merged_dir)

# 读取股票代码列表
with open(json_file, 'r') as f:
    tickers = json.load(f)

def merge_data_for_ticker(ticker):
    # 检查合并后的文件是否已存在
    output_file = os.path.join(merged_dir, f"{ticker}.csv")
    if os.path.isfile(output_file):
        return  # 如果已存在，则跳过这个股票
    
    all_data = []  # 存储单个股票的所有数据
    # 遍历每个年份的文件夹
    for year in os.listdir(data_dir):
        year_dir = os.path.join(data_dir, year)
        if os.path.isdir(year_dir):  # 确保是目录
            # 假设后缀可能是.SZ或.SH，尝试两种可能性
            for suffix in ['.SZ', '.SH']:
                file_path = os.path.join(year_dir, f"{ticker}{suffix}.csv")
                if os.path.isfile(file_path):  # 确保文件存在
                    data = pd.read_csv(file_path)
                    all_data.append(data)
                    break
    
    if all_data:
        # 合并数据
        merged_data = pd.concat(all_data)
        # 按交易时间排序
        merged_data.sort_values(by='trade_time', inplace=True)
        # 保存到merged文件夹
        merged_data.to_csv(os.path.join(merged_dir, f"{ticker}.csv"), index=False)

def main():
    max_workers = max(1, os.cpu_count() - 1)  # 保留一个CPU核心
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 设置tqdm进度条
        futures = [executor.submit(merge_data_for_ticker, ticker) for ticker in tickers]
        for _ in tqdm(concurrent.futures.as_completed(futures), total=len(tickers), desc="Merging Stocks"):
            pass

if __name__ == "__main__":
    main()


Merging Stocks:  32%|███▏      | 1623/5100 [1:56:44<19:40:49, 20.38s/it]

### 单只股票数据检查<多线程>

In [1]:
import pandas as pd
from pathlib import Path
import concurrent.futures
import os

def check_data_points_and_nan(file_path):
    df = pd.read_csv(file_path)
    df['trade_time'] = pd.to_datetime(df['trade_time'])
    df['date'] = df['trade_time'].dt.date
    grouped = df.groupby('date').size()

    ticker = Path(file_path).stem  # 获取文件名作为股票代码
    invalid_dates = grouped[grouped != 240].index.tolist()
    nan_values = df.isna().any(axis=1)

    if invalid_dates or nan_values.any():
        return ticker, invalid_dates, df[nan_values]

    return None

def main():
    folder_path = 'data/by_stock_merged/'  # 更改为您的数据文件夹路径
    file_paths = Path(folder_path).glob('*.csv')

    max_workers = max(1, os.cpu_count() - 1)  # 保留一个CPU核心
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = executor.map(check_data_points_and_nan, file_paths)
        
        for result in results:
            if result:
                ticker, invalid_dates, nan_data = result
                print(f'Ticker: {ticker}, Invalid Dates: {invalid_dates}')
                if not nan_data.empty:
                    print(f'Non-numeric data found in Ticker: {ticker}')

if __name__ == "__main__":
    main()

Ticker: 000638, Invalid Dates: []
Non-numeric data found in Ticker: 000638


### 异常数据处理

In [None]:
import pandas as pd
import os

files = [
    '000638.csv'
]
file_folder = 'data/by_stock_merged/'
return_folder = 'data/by_stock_return_rate/'

for file in files:
    file_path = file_folder + file
    df = pd.read_csv(file_path)

    df = df[df.loc[:, 'open':'amount'].isna().all(axis=1) & df['trade_time'].notna()]    # 去掉空值
    df = df[df['trade_time'] != '2023-11-30 13:00:00']  # 去掉2023-11-30 13:00:00这一行
    df = df.drop_duplicates(subset='trade_time')    # 去掉重复行

    df.to_csv(file_path, index=False)

    # 如果文件存在，则删除by_stock_return_rate中的对应文件
    return_file_path = return_folder + file
    if os.path.exists(return_file_path):
        os.remove(return_file_path)



### 获取历年流动性<多线程>

In [None]:
import os
import akshare as ak
import json
import pandas as pd
import numpy as np
import concurrent.futures
from tqdm import tqdm

def process_stock_data(year, ticker):
    # Prepare start and end dates
    start_date = f"{year}0101"
    end_date = f"{year}1231"

    try:
        # Fetch data from akshare
        turnover = ak.stock_zh_a_hist(symbol=ticker, start_date=start_date, end_date=end_date, adjust='')[['换手率']]
        amount = ak.stock_zh_a_hist(symbol=ticker, start_date=start_date, end_date=end_date, adjust='')[['成交额']]

        if turnover.empty or amount.empty:
            return None

        trading_days = len(turnover)
        daily_average_turnover = turnover['换手率'].mean() / 100
        circulating_market_cap = amount['成交额'].iloc[-1] / turnover['换手率'].iloc[-1] * 100

        return [ticker, daily_average_turnover, circulating_market_cap, trading_days]

    except Exception as e:
        return None

def main():
    try:
        with open('data/tickers.json') as f:
            tickers = json.load(f)
    except Exception as e:
        print(f"Error reading tickers file: {e}")
        return

    years = range(2000, 2024)
    max_workers = max(1, os.cpu_count() - 1)  # 保留一个CPU核心

    turnover_dir = 'data/turnover'
    if not os.path.exists(turnover_dir):
        os.makedirs(turnover_dir)

    for year in years:
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 在这里为每个ticker创建一个进度条
            futures = {executor.submit(process_stock_data, year, ticker): ticker for ticker in tickers}
            for future in tqdm(concurrent.futures.as_completed(futures), total=len(tickers), desc=f"Processing {year}"):
                result = future.result()
                if result:
                    results.append(result)

        if results:
            df = pd.DataFrame(results, columns=['tickers', 'turnover_ratio', 'circulating_market_cap', 'trading_days'])
            df.set_index('tickers', inplace=True)
            df.sort_values(by='turnover_ratio', ascending=False, inplace=True)
            filename = os.path.join(turnover_dir, f'turnovers_{year}.csv')
            df.to_csv(filename)
            df.to_pickle(os.path.join(turnover_dir, f'turnovers_{year}.pkl'))
            print(f"Data for {year} saved to {filename}")

if __name__ == "__main__":
    main()

### 按股票计算收益率并进行后复权处理

In [1]:
import pandas as pd
import os
import glob
import datetime

# 定义后复权处理函数
def process_hfq_data(stock_data, hfq_data_path, ticker, period, period_dict):
    file_found = False
    for prefix in ['sh', 'sz']:
        filename = f'{prefix}{ticker}.csv'
        file_path = os.path.join(hfq_data_path, filename)
        if os.path.exists(file_path):
            hfq_data = pd.read_csv(file_path)
            hfq_data['date'] = pd.to_datetime(hfq_data['date'])
            file_found = True
            break

    if file_found:
        stock_data['trade_time'] = pd.to_datetime(stock_data['trade_time'])
        stock_data['return_rate'] = stock_data['close'].pct_change().fillna(0)

        for index, row in hfq_data[hfq_data['date'] > '2000-01-01'].iterrows():
            if period == '1d':
                trade_time = row['date'].date()
            else:
                minutes = period_dict[period]
                additional_hours, additional_minutes = divmod(30 + minutes, 60)
                trade_time = row['date'] + datetime.timedelta(hours=9 + additional_hours, minutes=additional_minutes)

            mask = (stock_data['trade_time'].dt.date == trade_time) if period == '1d' else (stock_data['trade_time'] == trade_time)

            if mask.any():
                stock_data.loc[mask, 'return_rate'] = (stock_data.loc[mask, 'return_rate'] + 1) * row['hfq_one_point'] - 1

        return stock_data[['trade_time', 'return_rate']]  # 仅保留交易时间和收益率

    else:
        print(f'No hfq_one_point data file found for ticker {ticker}. Skipping...')
        return None



# 主函数
def main(input_folder, output_folder, hfq_data_path, period, period_dict):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_path in glob.glob(os.path.join(input_folder, '*.csv')):
        ticker = os.path.basename(file_path).split('.')[0]
        output_file_path = os.path.join(output_folder, os.path.basename(file_path))

        # 检查输出文件是否已存在
        if os.path.exists(output_file_path):
            print(f"Output for {ticker} already exists. Skipping...")
            continue  # 如果输出文件已存在，跳过该股票

        stock_data = pd.read_csv(file_path)
        processed_data = process_hfq_data(stock_data, hfq_data_path, ticker, period, period_dict)
        if processed_data is not None:
            processed_data.to_csv(output_file_path, index=False)

# 定义不同周期的分钟数
period_dict = {
    '1m': 1,
    '5m': 5,
    # ... 其他周期
}

input_folder = 'data/by_stock_merged'
output_folder = 'data/by_stock_return_rate'
hfq_data_path = 'data/backward_adjust_factor'
period = '1m'

if __name__ == "__main__":
    main(input_folder, output_folder, hfq_data_path, period, period_dict)


No hfq_one_point data file found for ticker 002396. Skipping...


KeyError: "['return_rate'] not in index"