In [2]:
import torch
import os
import numpy as np
import pandas as pd

from pathlib import Path
from typing import Dict, List

src_path = Path('.').absolute().parent
data_path = src_path / 'data'

In [3]:
data_path = src_path / 'data'
ps = list((data_path / 'kdd17/price_long_50').glob('*'))
p = ps[0]

In [4]:
# df_date = pd.read_csv(data_path / 'trading_dates.csv', header=None)
df = pd.read_csv(p)
df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d')
df = df.sort_values('Date').reset_index(drop=True)
df
# 在原DataFrame上删除第8列，但df只有7列，故下方代码会抛出IndexError
# df.drop(columns=df.columns[7], inplace=True)

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2007-01-03,17.219999,17.780001,17.100000,17.510000,52241700,16.016469
1,2007-01-04,17.549999,17.870001,17.299999,17.680000,33559800,16.171968
2,2007-01-05,17.620001,17.760000,17.440001,17.639999,36154800,16.135379
3,2007-01-08,17.629999,17.930000,17.450001,17.860001,31018100,16.336616
4,2007-01-09,17.930000,17.980000,17.650000,17.820000,31417000,16.300027
...,...,...,...,...,...,...,...
2513,2016-12-23,38.790001,39.020000,38.630001,38.779999,8143600,38.628711
2514,2016-12-27,38.770000,39.040001,38.759998,38.820000,5073700,38.668556
2515,2016-12-28,38.869999,38.919998,38.630001,38.680000,8264100,38.529103
2516,2016-12-29,38.610001,38.840000,38.610001,38.689999,5711800,38.539062


In [5]:
# df_date = pd.read_csv(data_path / 'trading_dates.csv', header=None)

In [6]:
import pytorch_lightning as pl
class DTMLDataset(torch.utils.data.Dataset):
    def __init__(self, data_path, ds_config):
        self.data_dir = data_path / ds_config['path']

class DTMLDataset(pl.LightningDataModule):
    # data_path: 数据目录的路径。
    # window_size: 时间窗口大小，用于滑动窗口数据生成。
    # dtype: 数据集类型，例如 'kdd17' 或 'acl18'。

    def __init__(self, data_path, window_size, dtype=''):

        # ref: https://arxiv.org/abs/1810.09936
        ds_info = {
            # train: (Jan-01-2007 to Jan-01-2015)
            # val: (Jan-01-2015 to Jan-01-2016)
            # test: (Jan-01-2016 to Jan-01-2017)
            'kdd17': {
                'path': 'kdd17/price_long_50',
                'date_path': 'kdd17/trading_dates.csv',
                'train_date': '2015-01-01', 
                'val_date': '2016-01-01', 
                'test_date': '2017-01-01',
                'window_size': window_size
            },
            # train: (Jan-01-2014 to Aug-01-2015)
            # vali: (Aug-01-2015 to Oct-01-2015)
            # test: (Oct-01-2015 to Jan-01-2016)
            'acl18': {
                'path': 'stocknet-dataset/price/raw',
                'date_path': 'stocknet-dataset/price/trading_dates.csv',
                'train_date': '2015-08-01', 
                'val_date': '2015-10-01', 
                'test_date': '2016-01-01',
                'window_size': window_size
            }
        }
        ds_config = ds_info[dtype]
    
    def load_dataset(self, config: dict):
        # 列出数据目录下所有的文件
        tick_files = [p for p in self.data_dir.glob('*') if not p.is_dir()]

        train_data = []
        val_data = []
        test_data = []
        for p in tick_files:
            # 加载单只股票数据并预处理
            df = self.load_single_tick(p)
            # train / val / test split
            df['date']

    def load_single_tick(self, p: Path | str):
         # 计算每一天的长期趋势，其中k是窗口大小，用来计算积和并除以窗口大小
        def longterm_trend(x: pd.Series, k:int):
            return (x.rolling(k).sum().div(k*x) - 1) * 100

        df = pd.read_csv(p)
        df['Date'] = pd.to_datetime(df['Date'])
        df = df.sort_values('Date').reset_index(drop=True)
        if 'Unnamed' in df.columns:
            df.drop(columns=df.columns[7], inplace=True)
        if 'Original_Open' in df.columns:
            df.rename(columns={'Original_Open': 'Open', 'Open': 'Adj Open'}, inplace=True)

        # Open, High, Low
        # Open, High, Low 每一列均除以当前’Close'列，然后减一，得到一个百分比值，再乘以100，得到一个百分比变化值。
        z1 = (df.loc[:, ['Open', 'High', 'Low']].div(df['Close'], axis=0) - 1).rename(
            columns={'Open': 'open', 'High': 'high', 'Low': 'low'}) * 100
        # Close
        # Close pct_change：每一行相对于前一行的变化百分比，如果df中的'Close'列的前两行分别是100和105，
        # 那么计算出的百分比变化将是(105-100)/100 * 100 = 5%，这个值会作为新DataFrame中对应行的'close'列的值。
        z2 = df[['Close']].pct_change().rename(columns={'Close': 'close'}) * 100
        # Adj Close
        z3 = df[['Adj Close']].pct_change().rename(columns={'Adj Close': 'adj_close'}) * 100

        z4 = []
        # 计算不同时间窗口的长期趋势
        for k in [5, 10, 15, 20, 25, 30]:
            z4.append(df[['Adj Close']].apply(longterm_trend, k=k).rename(columns={'Adj Close': f'zd{k}'}))

        df_pct = pd.concat([df['Date'], z1, z2, z3] + z4, axis=1).rename(columns={'Date': 'date'})
        # 选取缺失值数量等于最大缺失值数量的列
        # 这些列可能在数据处理中存在较多的缺失值，需要特别关注
        cols_max = df_pct.columns[df_pct.isnull().sum() == df_pct.isnull().sum().max()]
        # 从 df_pct 中选取不包含最大缺失值列中缺失值的行
        # 这样可以确保数据的完整性，避免计算中的错误
        df_pct = df_pct.loc[~df_pct[cols_max].isnull().values, :]

        # from https://arxiv.org/abs/1810.09936
        # Examples with movement percent ≥ 0.55% and ≤ −0.5% are 
        # identified as positive and negative examples, respectively
        
        # 处理label,二分类问题，涨1 跌-1，设置涨跌的阈值
        df_pct['label'] = 0
        df_pct.loc[(df_pct['close'] >= 0.55), 'label'] = 1
        df_pct.loc[(df_pct['close'] <= -0.5), 'label'] = -1

        # only select rise / fall
        df_pct['label'] = (df_pct['label'] + 1) / 2
        # 筛选出label列中值不等于0.5的行（即排除了原label为0的行），然后使用.reset_index(drop=True)重置索引，以确保索引是连续的
        df_pct = df_pct.loc[df_pct['label'] != 0.5, :].reset_index(drop=True)

        return df_pct

In [7]:
def load_single_tick(p: Path | str):
    def longterm_trend(x: pd.Series, k:int):
        return (x.rolling(k).sum().div(k*x) - 1) * 100

    df = pd.read_csv(p)
    df['Date'] = pd.to_datetime(df['Date'])#, format='%m/%d/%Y')
    df = df.sort_values('Date').reset_index(drop=True)
    if 'Unnamed' in df.columns:
        df.drop(columns=df.columns[7], inplace=True)
    if 'Original_Open' in df.columns:
        df.rename(columns={'Open': 'Adj Open', 'Original_Open': 'Open'}, inplace=True)
    # Open, High, Low
    z1 = (df.loc[:, ['Open', 'High', 'Low']].div(df['Close'], axis=0) - 1).rename(
        columns={'Open': 'open', 'High': 'high', 'Low': 'low'}) * 100
    # Close
    z2 = df[['Close']].pct_change().rename(columns={'Close': 'close'}) * 100
    # Adj Close
    z3 = df[['Adj Close']].pct_change().rename(columns={'Adj Close': 'adj_close'}) * 100

    z4 = []
    for k in [5, 10, 15, 20, 25, 30]:
        z4.append(df[['Adj Close']].apply(longterm_trend, k=k).rename(columns={'Adj Close': f'zd{k}'}))

    df_pct = pd.concat([df['Date'], z1, z2, z3] + z4, axis=1).rename(columns={'Date': 'date'})
    cols_max = df_pct.columns[df_pct.isnull().sum() == df_pct.isnull().sum().max()]
    df_pct = df_pct.loc[~df_pct[cols_max].isnull().values, :]

    # from https://arxiv.org/abs/1810.09936
    # Examples with movement percent ≥ 0.55% and ≤ −0.5% are 
    # identified as positive and negative examples, respectively
    
    df_pct['label'] = 0
    df_pct.loc[(df_pct['close'] >= 0.55), 'label'] = 1
    df_pct.loc[(df_pct['close'] <= -0.5), 'label'] = -1

    # only select rise / fall
    df_pct['label'] = (df_pct['label'] + 1) / 2
    df_pct = df_pct.loc[df_pct['label'] != 0.5, :].reset_index(drop=True)

    return df_pct

In [8]:
window_size = 5
dtype = 'kdd17'
ds_info = {
    # train: (Jan-01-2007 to Jan-01-2015)
    # val: (Jan-01-2015 to Jan-01-2016)
    # test: (Jan-01-2016 to Jan-01-2017)
    'kdd17': {
        'path': 'kdd17/price_long_50',
        'date_path': 'kdd17/trading_dates.csv',
        'train_date': '2015-01-01', 
        'val_date': '2016-01-01', 
        'test_date': '2017-01-01',
        'window_size': window_size
    },
    # train: (Jan-01-2014 to Aug-01-2015)
    # vali: (Aug-01-2015 to Oct-01-2015)
    # test: (Oct-01-2015 to Jan-01-2016)
    'acl18': {
        'path': 'stocknet-dataset/price/raw',
        'date_path': 'stocknet-dataset/price/trading_dates.csv',
        'train_date': '2015-08-01', 
        'val_date': '2015-10-01', 
        'test_date': '2016-01-01',
        'window_size': window_size
    }
}
ds_config = ds_info[dtype]

In [9]:
index2date = pd.read_csv(data_path / ds_config['date_path'], header=None).to_dict()[0]

In [10]:
data_dir = data_path / ds_config['path']
tick_files = [p for p in data_dir.glob('*') if not p.is_dir()]

In [11]:
data = []
for p in tick_files:
    data.append(load_single_tick(p))

In [12]:

df = load_single_tick(p)
train_idx = df['date'] < ds_config['train_date']
val_idx = (ds_config['train_date'] <= df['date']) & (df['date'] < ds_config['val_date'])
test_idx = (ds_config['val_date'] <= df['date']) & (df['date'] < ds_config['test_date'])

print(train_idx.sum(), val_idx.sum(), test_idx.sum())

train_data = df.loc[train_idx, :]
val_data = df.loc[val_idx, :]
test_idx = df.loc[test_idx, :]

983 128 130


In [13]:
# 生成滑动窗口索引和对应的目标索引
def sliding_window(T: int, window_size: int, time_lag: int):
    X_idx = np.expand_dims(np.arange(window_size), 0) + \
                np.expand_dims(np.arange(T - window_size - time_lag + 1), 0).T
    y_idx = np.arange(time_lag + window_size - 1 , T)
    
    return X_idx, y_idx

In [14]:
T = len(df.loc[train_idx, :])
window_size = 10
time_lag = 1

In [15]:
X_idx, y_idx = sliding_window(T, window_size, time_lag)

In [16]:
y_idx

array([ 10,  11,  12,  13,  14,  15,  16,  17,  18,  19,  20,  21,  22,
        23,  24,  25,  26,  27,  28,  29,  30,  31,  32,  33,  34,  35,
        36,  37,  38,  39,  40,  41,  42,  43,  44,  45,  46,  47,  48,
        49,  50,  51,  52,  53,  54,  55,  56,  57,  58,  59,  60,  61,
        62,  63,  64,  65,  66,  67,  68,  69,  70,  71,  72,  73,  74,
        75,  76,  77,  78,  79,  80,  81,  82,  83,  84,  85,  86,  87,
        88,  89,  90,  91,  92,  93,  94,  95,  96,  97,  98,  99, 100,
       101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113,
       114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126,
       127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139,
       140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152,
       153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165,
       166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178,
       179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 19

In [17]:
train_data.values[X_idx].shape

(973, 10, 13)

In [18]:
train_data.values[X_idx]

array([[[Timestamp('2007-02-26 00:00:00'), 0.34140594843556915,
         0.8535009046088948, ..., 0.6989398151337145, 0.798153850818295,
         0.0],
        [Timestamp('2007-02-27 00:00:00'), 4.995915102040804,
         5.681637551020402, ..., 5.651590711911569, 5.8606793879533825,
         0.0],
        [Timestamp('2007-02-28 00:00:00'), -1.559303778254717,
         0.44101116897115045, ..., 1.788624339969358,
         2.0627882734009617, 1.0],
        ...,
        [Timestamp('2007-03-15 00:00:00'), -0.5653351639476556,
         0.22613309643031432, ..., 2.520433713331016,
         2.9268298329088394, 1.0],
        [Timestamp('2007-03-19 00:00:00'), -0.41948854469182617,
         0.27428525330752684, ..., 2.0542115225434143,
         2.4292786471169503, 1.0],
        [Timestamp('2007-03-21 00:00:00'), -0.9847506509649051,
         0.06353399084393363, ..., 0.23507080556035564,
         0.5966762060060082, 1.0]],

       [[Timestamp('2007-02-27 00:00:00'), 4.995915102040804,
       

In [17]:
import torch
import torch.nn as nn
import pytorch_lightning as pl

class TimeAxisAttention(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, num_layers: int):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=False)
        self.lnorm = nn.LayerNorm(hidden_size)
        
    # 基于LSTM的注意力机制，用于计算输入序列的上下文向量，并可以选择性地返回注意力权重。
    def forward(self, x: torch.tensor, rt_attn=False):
        # x: (D, W, L)
        # o:输出, h:包含最后一个时间步的隐藏状态
        o, (h, _) = self.lstm(x) # o: (D, W, H) / h: (1, D, H)
        # 执行批量矩阵乘法，计算每个时间步的注意力权重
        score = torch.bmm(o, h.permute(1, 2, 0)) # (D, W, H) x (D, H, 1)
        tx_attn = torch.softmax(score, 1).squeeze(-1)  # (D, W)
        context = torch.bmm(tx_attn.unsqueeze(1), o).squeeze(1)  # (D, 1, W) x (D, W, H)
        normed_context = self.lnorm(context)
        if rt_attn:
            return normed_context, tx_attn
        else:
            return normed_context, None
            
class DataAxisAttention(nn.Module):
    def __init__(self, hidden_size, n_heads, drop_rate=0.1):
        super().__init__()
        self.multi_attn = nn.MultiheadAttention(hidden_size, n_heads, batch_first=True)
        self.mlp = nn.Sequential(
            nn.Linear(hidden_size, 4*hidden_size),
            nn.ReLU(),
            nn.Linear(4*hidden_size, hidden_size)
        )
        self.lnorm1 = nn.LayerNorm(hidden_size)
        self.lnorm2 = nn.LayerNorm(hidden_size)
        self.drop_out = nn.Dropout(drop_rate)

    def forward(self, hm: torch.tensor, rt_attn=False):
        # Forward Multi-head Attention
        residual = hm
        # hm_hat: (D, H), dx_attn: (D, D) 
        hm_hat, dx_attn = self.multi_attn(hm, hm, hm)
        hm_hat = self.lnorm1(residual + self.drop_out(hm_hat))

        # Forward FFN
        residual = hm_hat
        # hp: (D, H)
        hp = torch.tanh(hm + hm_hat + self.mlp(hm + hm_hat))
        hp = self.lnorm2(residual + self.drop_out(hp))

        if rt_attn:
            return hp, dx_attn
        else:
            return hp, None

class DTML(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, n_heads, beta=0.1, drop_rate=0.1):
        super.__init__()
        self.beta = beta
        self.txattention = TimeAxisAttention(input_size, hidden_size, num_layers)
        self.dxattention = DataAxisAttention(hidden_size, n_heads, drop_rate)
        self.linear = nn.Linear(hidden_size, 1)

    def forward(self, stocks, index, rt_attn=False):
        # stocks: (W, D, L) for a single time stamp
        # index: (W, 1, L) for a single time stamp
        # W: length of observations
        # D: number of stocks
        # L: number of features
        
        # Time-Axis Attention
        # c_stocks: (D, H) / tx_attn_stocks: (D, W)
        c_stocks, tx_attn_stocks = self.txattention(stocks.transpose(1, 0), rt_attn=rt_attn)
        # c_index: (1, H) / tx_attn_index: (1, W)
        c_index, tx_attn_index = self.txattention(index.transpose(1, 0), rt_attn=rt_attn)
        
        # Context Aggregation
        # Multi-level Context
        # hm: (D, H)
        hm = c_stocks + self.beta * c_index
        # The Effect of Global Contexts
        # effect: (D, D)
        effect = c_stocks.mm(c_stocks.transpose(0, 1)) + \
            self.beta * c_index.mm(c_stocks.transpose(1, 0)) + \
            self.beta**2 * torch.mm(c_index, c_index.transpose(0, 1)) 

        # Data-Axis Attention
        # hp: (D, H) / dx_attn: (D, D)
        hp, dx_attn_stocks = self.dxattention(hm, rt_attn=rt_attn)
        # output: (D, 1)
        output = self.linear(hp)

        return {
            'output': output,
            'tx_attn_stocks': tx_attn_stocks,
            'tx_attn_index': tx_attn_index,
            'dx_attn_stocks': dx_attn_stocks,
            'effect': effect
        }