In [1]:
import numpy as np
import numba as nb
import torch as th
import SharedArray as sa
import os
import nutils
import common as cm
import cupy as cp
from tqdm import tqdm
import os

In [2]:
codes = cm.SELECTED_CODES


def get_snapshot(code):
    return (
        sa.attach(f"snapshot_{code}"),
        sa.attach(f"label_{code}"),
        sa.attach(f"timestamp_{code}"),
    )

In [3]:
COLS_DIFF = [
    "TotalBidVolume",
    "TotalAskVolume",
    "BidPrice1",
    "BidPrice2",
    "BidPrice3",
    "BidPrice4",
    "BidPrice5",
    "BidPrice6",
    "BidPrice7",
    "BidPrice8",
    "BidPrice9",
    "BidPrice10",
    "AskPrice1",
    "AskPrice2",
    "AskPrice3",
    "AskPrice4",
    "AskPrice5",
    "AskPrice6",
    "AskPrice7",
    "AskPrice8",
    "AskPrice9",
    "AskPrice10",
    "BidVolume1",
    "BidVolume2",
    "BidVolume3",
    "BidVolume4",
    "BidVolume5",
    "BidVolume6",
    "BidVolume7",
    "BidVolume8",
    "BidVolume9",
    "BidVolume10",
    "AskVolume1",
    "AskVolume2",
    "AskVolume3",
    "AskVolume4",
    "AskVolume5",
    "AskVolume6",
    "AskVolume7",
    "AskVolume8",
    "AskVolume9",
    "AskVolume10",
]
COLS_SNAPSHOTS_INDEX = {col: i for i, col in enumerate(cm.COLS_SNAPSHOTS)}
COLS_DIFF_INDEX = [COLS_SNAPSHOTS_INDEX[col] for col in COLS_DIFF]




def standardize_by_day_on_gpu(x, ts, device_id=0):
    with cp.cuda.Device(device_id):
        # 将 numpy 数组转换为 cupy 数组
        x_cp = cp.array(x)
        ts_cp = cp.array(ts)

        # 对指定列进行差分操作
        for col_idx in COLS_DIFF_INDEX:
            x_cp[1:, col_idx] = cp.diff(x_cp[:, col_idx], axis=0)
        # 将差分后的第一行置为0
        x_cp[0, COLS_DIFF_INDEX] = 0

        # 获取所有唯一的天数
        unique_days = cp.unique(ts_cp)

        # 初始化标准化后的数组
        x_standardized = cp.zeros_like(x_cp)

        # 按天标准化
        for day in unique_days:
            mask = ts_cp == day
            x_day = x_cp[mask]

            # 计算每一列的均值和标准差
            mean = cp.mean(x_day, axis=0)
            std = cp.std(x_day, axis=0)

            # 标准化
            x_standardized[mask] = (x_day - mean) / std

        # 将标准化后的数据从 GPU 转回 CPU 并转换为 numpy 数组
        return cp.asnumpy(x_standardized)

result = {}
for code in tqdm(codes):
    x, y, ts = get_snapshot(code)
    x = np.nan_to_num(x,0, 0, 0)
    x_standardized = standardize_by_day_on_gpu(x, ts)
    # 将标准化后的数据从 GPU 转回 CPU 并转换为 numpy 数组
    x_standardized_cpu = cp.asnumpy(x_standardized)
    # 你现在可以使用 x_standardized_cpu 进行后续处理
    result[code] = x_standardized_cpu

save_path = "/mnt/disk1/snapshot_diff_norm/"
os.makedirs(save_path,exist_ok=True)
for code,df in result.items():
    np.save(os.path.join(save_path,f"{code}.npy"),df)

In [4]:
import cupy as cp


def standardize_by_n_days_on_gpu(x, ts, diff=1, n=20, device_id=0):
    with cp.cuda.Device(device_id):
        # 将 numpy 数组转换为 cupy 数组
        x_cp = cp.array(x)
        ts_cp = cp.array(ts)

        # 对指定列进行差分操作
        for col_idx in COLS_DIFF_INDEX:
            x_cp[diff:, col_idx] = cp.diff(x_cp[:, col_idx], axis=0, n=diff)
        # 将差分后的第一行置为0
        x_cp[: diff, COLS_DIFF_INDEX] = 0

        # 获取所有唯一的天数并排序
        unique_days = cp.unique(ts_cp)
        unique_days.sort()

        # 初始化标准化后的数组
        x_standardized = cp.zeros_like(x_cp)

        # 按天标准化，使用过去n天的数据
        for idx, day in enumerate(unique_days):
            # 计算使用的天数的开始索引
            start_idx = max(0, idx - n + 1)
            past_days = unique_days[start_idx : idx + 1]

            # 找到这些天数对应的所有数据
            mask = cp.isin(ts_cp, past_days)
            x_days = x_cp[mask]

            # 计算这些天数数据的均值和标准差
            mean = cp.mean(x_days, axis=0)
            std = cp.std(x_days, axis=0)
            std += cp.where(std == 0, 1, 0)  # 防止除以0

            # 当天的数据
            mask_today = ts_cp == day
            x_today = x_cp[mask_today]

            # 标准化
            x_standardized[mask_today] = (x_today - mean) / std

        # 查找和替换 NaN 和 Inf 为 0
        nan_mask = cp.isnan(x_standardized)
        inf_mask = cp.isinf(x_standardized)
        x_standardized[nan_mask | inf_mask] = 0

        # 将标准化后的数据从 GPU 转回 CPU 并转换为 numpy 数组
        return cp.asnumpy(x_standardized)

In [6]:
result = {}
n = 20
diff = 1
for diff in [6,7,8,9,10]:
    for code in tqdm(codes):
        x, y, ts = get_snapshot(code)
        x = np.where(np.isnan(x) | np.isinf(x), 0, x)
        x_standardized = standardize_by_n_days_on_gpu(x, ts, diff=diff, n=20)
        # 将标准化后的数据从 GPU 转回 CPU 并转换为 numpy 数组
        x_standardized_cpu = cp.asnumpy(x_standardized)
        # 你现在可以使用 x_standardized_cpu 进行后续处理
        result[code] = x_standardized_cpu
    save_path = f"/mnt/disk2/snapshot_diff{diff}_norm{n}/"
    os.makedirs(save_path, exist_ok=True)
    for code, df in result.items():
        np.save(os.path.join(save_path, f"{code}.npy"), df)

100%|██████████| 100/100 [10:53<00:00,  6.53s/it]
100%|██████████| 100/100 [10:18<00:00,  6.19s/it]
100%|██████████| 100/100 [10:22<00:00,  6.23s/it]
100%|██████████| 100/100 [10:43<00:00,  6.44s/it]
100%|██████████| 100/100 [10:45<00:00,  6.46s/it]
