當前路徑

In [5]:
import os

current_working_directory = os.getcwd()

print("Current working directory:", current_working_directory)

Current working directory: d:\Python\StockToolsPro\AutoGluon\DataProcess


從資料夾篩選檔案大小>40KB的檔案

In [None]:
import os


def filterFilesSize(directory, size):
    OverSizeFiles = []
    files = os.listdir(directory)
    for file in files:
        filePath = os.path.join(directory, file)
        if os.path.getsize(filePath) > size:
            OverSizeFiles.append(file)
    return OverSizeFiles


if __name__ == '__main__':
    targetDir = r"D:\Temp\StockData\US_STOCK_DATA\StockData"
    size = 40 * 1024    # 40KB
    print(filterFilesSize(targetDir, size))


導入美股歷史數據來給出交易訊號，並把結果輸出在tradeSignals資料夾裡面  
AutoGluon訓練集

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import os
import shutil
import random
from sklearn.utils import resample

# 忽略pandas警告
warnings.filterwarnings('ignore')


# 檢查是否在當前索引處檢測到局部高點
def rw_top(data: np.array, curr_index: int, order: int) -> bool:
    if curr_index < order * 2 + 1:
        return False

    top = True
    k = curr_index - order
    v = data[k]
    for i in range(1, order + 1):
        if data[k + i] > v or data[k - i] > v:
            top = False
            break

    return top


# 檢查是否在當前索引處檢測到局部低點
def rw_bottom(data: np.array, curr_index: int, order: int) -> bool:
    if curr_index < order * 2 + 1:
        return False

    bottom = True
    k = curr_index - order
    v = data[k]
    for i in range(1, order + 1):
        if data[k + i] < v or data[k - i] < v:
            bottom = False
            break

    return bottom


def rw_extremes(data: np.array, order: int):
    # 滾動窗口局部高點和低點
    tops = []
    bottoms = []
    for i in range(len(data)):
        if rw_top(data, i, order):
            # top[0] = 確認索引
            # top[1] = 高點索引
            # top[2] = 高點價格
            top = [i, i - order, data[i - order]]
            tops.append(top)

        if rw_bottom(data, i, order):
            # bottom[0] = 確認索引
            # bottom[1] = 低點索引
            # bottom[2] = 低點價格
            bottom = [i, i - order, data[i - order]]
            bottoms.append(bottom)

    return tops, bottoms


def DateColName(df):
    # 檢查日期列名稱
    possible_date_columns = ['Date', 'Datetime', 'date', 'datetime']
    for col_name in possible_date_columns:
        if col_name in df.columns:
            return col_name


def StockSignalPlot(data, tops, bottoms, stock):
    # 繪製股票價格圖
    data['Close'].plot()
    idx = data.index
    for top in tops:
        plt.plot(idx[top[1]], top[2], marker='o', color='green')

    for bottom in bottoms:
        plt.plot(idx[bottom[1]], bottom[2], marker='o', color='red')

    plt.title(f'{stock} closing price extreme')
    plt.legend(['Close'])
    plt.show()


def filterFilesSize(directory, size):
    OverSizeFiles = []
    files = os.listdir(directory)
    for file in files:
        filePath = os.path.join(directory, file)
        if os.path.getsize(filePath) > size:
            OverSizeFiles.append(file)
    return OverSizeFiles


def tradeSignalsGen(stockDataPath, outSignalsPath, stockList):
    for stock in stockList:
        """
        data = pd.read_csv('BTCUSDT86400.csv')
        data['Date'] = data['Date'].astype('datetime64[s]')
        """
        # print(f"Processing {stock}...")

        data = pd.read_csv(os.path.join(stockDataPath, stock))
        DateName = DateColName(data)

        data[DateName] = pd.to_datetime(data[DateName]).dt.tz_localize(None)
        data = clean_data(data)
        _, bottoms = rw_extremes(data['Close'].to_numpy(), 20)
        tops, _ = rw_extremes(data['Close'].to_numpy(), 20)

        # 標註買賣訊號
        data['Buy_Signal'] = None
        data['Sell_Signal'] = None
        for top in tops:
            data.at[data.index[top[1]], 'Sell_Signal'] = 'Sell'
        for bottom in bottoms:
            data.at[data.index[bottom[1]], 'Buy_Signal'] = 'Buy'

        # 輸出新的CSV文件
        output_path = os.path.join(outSignalsPath, stock)
        data.to_csv(output_path)

        # StockSignalPlot(data, tops, bottoms, stock)

    print("Done")


def processData(df, apply_downsampling=False):
    # 根據 'Buy_Signal' 和 'Sell_Signal' 創建 'Signal' 欄位
    df['Signal'] = df.apply(
        lambda row: 'Buy' if pd.notna(row['Buy_Signal']) else ('Sell' if pd.notna(row['Sell_Signal']) else 'Hold'),
        axis=1)

    # 將 'Signal' 映射為數值：持有 = 0，買入 = 1，賣出 = 2
    signal_mapping = {'Hold': 0, 'Buy': 1, 'Sell': 2}
    df['Signal'] = df['Signal'].map(signal_mapping)

    # 刪除原始的 'Buy_Signal' 和 'Sell_Signal' 欄位
    df = df.drop(['Buy_Signal', 'Sell_Signal'], axis=1)

    # 僅對 'Hold' 信號應用重新取樣
    if apply_downsampling:
        df_hold = df[df['Signal'] == 0]
        df_buy = df[df['Signal'] == 1]
        df_sell = df[df['Signal'] == 2]
        n_samples_buy_sell = min(len(df_buy), len(df_sell))
        n_samples_hold = max(1, n_samples_buy_sell // 2)  # 確保結果非零時至少有一個樣本

        if n_samples_hold > 0:
            df_hold_downsampled = resample(df_hold, replace=False, n_samples=n_samples_hold, random_state=123)
            df = pd.concat([df_buy, df_sell, df_hold_downsampled])
        else:
            df = pd.concat([df_buy, df_sell, df_hold])  # 如果不可行，包括所有原始的 'Hold' 資料

    df = df.sort_values(by='Date')
    return df


def TrainDataSplit(source_folder, target_root, size_limit_mb=600):
    """
    將處理後的數據分割為大小不超過指定大小的多個目錄
    """
    # 計算大小限制的字節數
    size_limit_bytes = size_limit_mb * 1024 * 1024

    # 獲取所有檔案及其大小
    files = []
    for dirpath, dirnames, filenames in os.walk(source_folder):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            filesize = os.path.getsize(filepath)
            files.append((filepath, filesize))

    # 隨機打亂檔案列表
    random.shuffle(files)

    subfolder_index = 1
    while files:
        current_folder = os.path.join(target_root, f'subfolder_{subfolder_index}')
        os.makedirs(current_folder, exist_ok=True)
        current_size = 0

        # 檔案移動迴圈
        for (filepath, filesize) in list(files):  # 使用list進行循環以允許從原列表中移除項目
            if current_size + filesize <= size_limit_bytes:
                shutil.move(filepath, current_folder)
                current_size += filesize
                files.remove((filepath, filesize))

        subfolder_index += 1

        # 如果當前資料夾未達到大小限制，且沒有更多檔案可添加，則結束迴圈
        if current_size < size_limit_bytes and sum(fsize for _, fsize in files) < size_limit_bytes:
            break


def trainDataPreproc():
    # try:
    # 設定路徑
    stockDataPath = r"D:\Temp\StockData\US_STOCK_DATA\tradeSignals"
    trainDataPath = r"D:\Temp\StockData\US_STOCK_DATA\TrainData"
    targetDataPath = r"D:\Temp\StockData\US_STOCK_DATA\TargetData"
    bytes_of_size = 600

    if not os.path.exists(trainDataPath):
        os.makedirs(trainDataPath, exist_ok=False)

    signalsFiles = os.listdir(stockDataPath)
    for file in signalsFiles:
        data = pd.read_csv(os.path.join(stockDataPath, file))
        data = processData(data, apply_downsampling=True)

        # 輸出新的CSV文件
        output_path = os.path.join(trainDataPath, file)
        data.to_csv(output_path, index=False)

    TrainDataSplit(trainDataPath, targetDataPath, bytes_of_size)
    # except Exception as e:
    #     print(e)


def US_StockSignal():
    # try:
    # 設定路徑
    stockDataPath = r"D:\Temp\StockData\US_STOCK_DATA\StockData"
    outSignalsPath = r"D:\Temp\StockData\US_STOCK_DATA\tradeSignals"

    if not os.path.exists(outSignalsPath):
        os.makedirs(outSignalsPath, exist_ok=False)

    size = 40 * 1024  # 40KB
    stockList = filterFilesSize(stockDataPath, size)
    # tradeSignalsGen(stockDataPath, outSignalsPath, stockList)
    trainDataPreproc()


# except Exception as e:
#     print(e)


def clean_data(df):
    # 刪除 'Open' 為 0 的列
    df = df[df['Open'] != 0]

    # 辨識 'Volume' 為 0 的列
    df['Zero_Volume'] = (df['Volume'] == 0)

    # 尋找連續三天或更多天 'Volume' 為 0 的序列
    df['Group'] = (df['Zero_Volume'] != df['Zero_Volume'].shift()).cumsum()
    df['Count_In_Group'] = df.groupby('Group')['Zero_Volume'].transform('sum')

    # 篩選出 'Count_In_Group' 為 3或更多的群組
    df = df[(~df['Zero_Volume']) | (df['Count_In_Group'] < 3)]

    # 刪除暫時性欄位和 'Dividends'、'Stock Splits'、'Symbol' 欄位
    df.drop(columns=['Zero_Volume', 'Group', 'Count_In_Group', 'Dividends', 'Stock Splits', 'Symbol'], inplace=True)

    if 'Capital Gains' in df.columns:
        df.drop(columns=['Capital Gains'], inplace=True)

    return df


if __name__ == "__main__":
    US_StockSignal()
