In [1]:
MAIN_SEED = 493

BATCH_SIZE = 32
# SEQ_LENGTH = 60

MODEL_DIM = 56
NUM_HEADS = 8
NUM_LAYERS = 3
FFN_DIM = 88 # [256, 512]
LR = 7.456901799768986e-06
EPOCH = 27

In [2]:
%pip install -U typing_extensions

Note: you may need to restart the kernel to use updated packages.


In [3]:
import math
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler

def set_seed(seed_value=42):
    # Set seed for reproducibility
    random.seed(seed_value)  # Python's random module
    np.random.seed(seed_value)  # NumPy
    torch.manual_seed(seed_value)  # PyTorch
    torch.cuda.manual_seed(seed_value)  # GPU (if available)
    torch.cuda.manual_seed_all(seed_value)  # For multi-GPU
    torch.backends.cudnn.deterministic = True  # Ensures deterministic algorithms
    torch.backends.cudnn.benchmark = False  # Disable benchmark mode for reproducibility

# Set seed for reproducibility
set_seed(MAIN_SEED)

In [4]:
class TimeSeriesDataset(Dataset):
    def __init__(self, data, labels, seq_length):
        """
        Args:
            data: NumPy array of shape (N, T, 19), where N is the number of sequences
            labels: NumPy array of shape (N, 19) for classification
            seq_length: Length of sequences (T)
        """
        self.data = data
        self.labels = labels
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)

def preprocess_data(raw_data, raw_labels, seq_length, batch_size, shuffle):
    """
    Args:
        raw_data: NumPy array of shape (total_timesteps, 19)
        raw_labels: NumPy array of shape (total_timesteps, 19)
        seq_length: The length of sequences (T)
    
    Returns:
        PyTorch DataLoader for training
    """

    # Create overlapping sequences
    data_sequences = []
    label_sequences = []
    for i in range(len(raw_data) - seq_length):
        data_sequences.append(raw_data[i:i+seq_length].astype(np.float32))
        label_sequences.append(raw_labels[i + seq_length].astype(np.float32))  # Predict based on last time step

    data_sequences = np.array(data_sequences)  # Shape (N, T, 19)
    label_sequences = np.array(label_sequences)  # Shape (N, 19)

    dataset = TimeSeriesDataset(data_sequences, label_sequences, seq_length)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

    return dataloader

def create_lstm_dataset(df, sequence_length=12, step=20):
    X, y = [], []

    for i in range((sequence_length - 1) * step, len(df)):  # Ensure enough past data for X
        x_i = []
        for j in range(sequence_length):
            index = i - (sequence_length - 1 - j) * step  # Compute the index for each row in X_i
            if index < 0:
                break  # Stop if we run out of data
            x_i.append(df[index, :19])  # Select return rates of stock1, stock2, stock3

        if len(x_i) == sequence_length:  # Only append fully constructed sequences
            X.append(np.array(x_i))
            y.append(df[i, 19:])  # Take y from the same index as the last row of X_i

    X = np.array(X)
    y = np.array(y)

    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

In [5]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

sheet_url = "https://docs.google.com/spreadsheets/d/1vXUzWW-6toJxGv8DL5I9RTlq-vcrjCGQcUuFQQtjJbY/export?format=csv&gid=0" # train_data_daily_monthly_3_2
df = pd.read_csv(sheet_url)
    
df_ret_original = df.iloc[:, 1:20]
df_bm_original  = df.iloc[:, 61:80]

df_ret = df.iloc[:, 1:20]
df_out = df.iloc[:, 21:40]
df_m = df.iloc[:, 41:60]
df_bm = df.iloc[:, 61:80]
df_b = df.iloc[:, 81:100]

df_selected_list = [df_bm.copy()] # which (combination of) indicator to use
df_selected = pd.concat(df_selected_list, axis=1) 
INPUT_DIM = df_selected.shape[1]

train_split_idx = math.floor(df_selected.shape[0] * 0.6)
val_split_idx = math.floor(df_selected.shape[0] * 0.8)

# Split the dataset
X_train, X_val, X_test = (
    df_selected.iloc[:train_split_idx, :],  # Train set (0% -> 60%)
    df_selected.iloc[train_split_idx:val_split_idx, :],  # Validation set (60% -> 80%)
    df_selected.iloc[val_split_idx:, :]  # Test set (80% -> 100%)
)

df_y_train, df_y_val, df_y_test = (
    df_out.iloc[:train_split_idx, :],
    df_out.iloc[train_split_idx:val_split_idx, :],
    df_out.iloc[val_split_idx:, :]
)


scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

df_selected_aug_train = np.hstack([X_train_scaled, df_y_train])
X_train, y_train = create_lstm_dataset(df_selected_aug_train)

df_selected_aug_val = np.hstack([X_val_scaled, df_y_val])
X_val, y_val = create_lstm_dataset(df_selected_aug_val)

df_selected_aug_test = np.hstack([X_test_scaled, df_y_test])
X_test, y_test = create_lstm_dataset(df_selected_aug_test)


print("\nData shape after split")
print("X_train:", X_train.shape)
print("y_train:", y_train.shape)
print("X_val:", X_val.shape)
print("y_val:", y_val.shape)
print("X_test:", X_test.shape)
print("y_test:", y_test.shape)

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print("\n Dataset:")
print("train:", len(train_dataloader))
print("val:", len(val_dataloader))
print("test:", len(test_dataloader))

print("\nData shape after split")
print("X_train:", X_train.shape)
print("y_train:", y_train.shape)
print("X_val:", X_val.shape)
print("y_val:", y_val.shape)
print("X_test:", X_test.shape)
print("y_test:", y_test.shape)

print("\n Dataset:")
print("train:", len(train_dataloader))
print("val:", len(val_dataloader))
print("test:", len(test_dataloader))


Data shape after split
X_train: torch.Size([2533, 12, 19])
y_train: torch.Size([2533, 19])
X_val: torch.Size([698, 12, 19])
y_val: torch.Size([698, 19])
X_test: torch.Size([698, 12, 19])
y_test: torch.Size([698, 19])

 Dataset:
train: 80
val: 22
test: 22

Data shape after split
X_train: torch.Size([2533, 12, 19])
y_train: torch.Size([2533, 19])
X_val: torch.Size([698, 12, 19])
y_val: torch.Size([698, 19])
X_test: torch.Size([698, 12, 19])
y_test: torch.Size([698, 19])

 Dataset:
train: 80
val: 22
test: 22


In [6]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, d_model)

        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, num_classes, ffn_dim=128, dropout=0.1):
        super().__init__()
        self.embedding = nn.Linear(input_dim, model_dim)
        self.positional_encoding = PositionalEncoding(model_dim, dropout)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim,
            nhead=num_heads,
            dim_feedforward=ffn_dim,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc = nn.Linear(model_dim, num_classes)  # Output num_classes for classification
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = self.embedding(x)  # Shape: (batch_size, seq_length, model_dim)
        x = self.positional_encoding(x)  # Add positional encoding
        x = self.transformer_encoder(x)  # Transformer encoding
        x = self.fc(x[:, -1, :])  # Take last timestep's output for classification
        out = self.softmax(x)  # Apply softmax to the output
        return out

In [7]:
model = TransformerClassifier(input_dim=INPUT_DIM,    # input값의 dim (1 or 2 or 3)
                              model_dim=MODEL_DIM,    # 모델에서 사용되는 dim
                              num_heads=NUM_HEADS,    # 헤드 개수
                              num_layers=NUM_LAYERS,  # 
                              num_classes=19,         # output 벡터의 차원
                              ffn_dim=FFN_DIM,        # 
                              dropout=0.1             # dropout rate
                              )

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {count_parameters(model)}")

# Training loop
train_losses = []
val_losses = []

for epoch in range(EPOCH):
    model.train()
    total_train_loss = 0

    for inputs, targets in train_dataloader:
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)  # (batch_size, num_classes)

        # Compute loss (MSE between logits and one-hot target)
        loss = criterion(outputs, targets)  # targets are one-hot encoded
        total_train_loss += loss.item()

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    avg_train_loss = total_train_loss / len(train_dataloader)
    train_losses.append(avg_train_loss)

    # Validation loop
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_dataloader:
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_val_loss += loss.item()

    avg_val_loss = total_val_loss / len(val_dataloader)
    val_losses.append(avg_val_loss)

    print(f"Epoch {epoch + 1}/{EPOCH}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

torch.save(model.state_dict(), "transformer_model.pth")

Total trainable parameters: 71179
Epoch 1/27, Train Loss: 0.3068, Val Loss: 0.3065
Epoch 2/27, Train Loss: 0.3063, Val Loss: 0.3064
Epoch 3/27, Train Loss: 0.3061, Val Loss: 0.3063
Epoch 4/27, Train Loss: 0.3058, Val Loss: 0.3062
Epoch 5/27, Train Loss: 0.3056, Val Loss: 0.3061
Epoch 6/27, Train Loss: 0.3054, Val Loss: 0.3061
Epoch 7/27, Train Loss: 0.3053, Val Loss: 0.3060
Epoch 8/27, Train Loss: 0.3052, Val Loss: 0.3060
Epoch 9/27, Train Loss: 0.3051, Val Loss: 0.3060
Epoch 10/27, Train Loss: 0.3050, Val Loss: 0.3059
Epoch 11/27, Train Loss: 0.3048, Val Loss: 0.3059
Epoch 12/27, Train Loss: 0.3047, Val Loss: 0.3059
Epoch 13/27, Train Loss: 0.3047, Val Loss: 0.3059
Epoch 14/27, Train Loss: 0.3046, Val Loss: 0.3059
Epoch 15/27, Train Loss: 0.3046, Val Loss: 0.3059
Epoch 16/27, Train Loss: 0.3045, Val Loss: 0.3058
Epoch 17/27, Train Loss: 0.3045, Val Loss: 0.3058
Epoch 18/27, Train Loss: 0.3044, Val Loss: 0.3058
Epoch 19/27, Train Loss: 0.3043, Val Loss: 0.3058
Epoch 20/27, Train Loss: 

In [8]:
def transform_array(arr):
    result = np.zeros_like(arr)  # Initialize an array filled with 0s

    # Get indices of 4 largest and 4 smallest values in each row
    sorted_indices = np.argsort(arr, axis=1)  # Sort indices along each row

    lowest_indices = sorted_indices[:, :4]   # First 4 indices (smallest values)
    highest_indices = sorted_indices[:, -4:] # Last 4 indices (largest values)

    # Assign -1 to lowest values
    np.put_along_axis(result, lowest_indices, -1, axis=1)
    # Assign 1 to highest values
    np.put_along_axis(result, highest_indices, 1, axis=1)

    return result

"""
model = TransformerClassifier(input_dim=3, model_dim=16, num_heads=4, num_layers=4, ffn_dim=16)  # Initialize model
model.load_state_dict(torch.load("transformer_model.pth"))  # Load saved weights
model.eval()  # Set to evaluation mode
"""

model.eval()

output_list = []
with torch.no_grad():
    for inputs, targets in test_dataloader:
        outputs = model(inputs)  # Get logits
        output_list.append(outputs.numpy())

y_pred = np.vstack(output_list)
y_pos_pred = transform_array(y_pred)

In [9]:
print("\n\nPanel A : Mar. 2022 - Feb. 2025")

# Calculate H4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx:, :]

signals = pd.DataFrame(y_pos_pred).copy()
signals.replace(-1, 0, inplace=True)

signals.columns = df_ret_org_test.columns

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4.csv")



# Calculate L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx:, :]

signals = pd.DataFrame(y_pos_pred).copy()
signals.replace(1, 0, inplace=True)
signals.replace(-1, 1, inplace=True)

signals.columns = df_ret_org_test.columns

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_l4.csv")



# Calculate H4-L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx:, :]

signals = pd.DataFrame(y_pos_pred).copy()

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4-L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4_l4.csv")



Panel A : Mar. 2022 - Feb. 2025
H4 strategy
(Monthly)  Avg. Ret. = 0.02721417727507163
(Annual) Avg. Ret. = 0.32657012730085955

L4 strategy
(Monthly)  Avg. Ret. = -0.014617385403295128
(Annual) Avg. Ret. = -0.17540862483954153

H4-L4 strategy
(Monthly)  Avg. Ret. = 0.04183156267836676
(Annual) Avg. Ret. = 0.5019787521404011



In [10]:
print("\n\nPanel B : Mar. 2022 - Feb. 2023")

# Calculate H4 return 
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx:val_split_idx+232, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[:232, :]
signals.replace(-1, 0, inplace=True)

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4.csv")



# Calculate L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx:val_split_idx+232, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[:232, :]
signals.replace(1, 0, inplace=True)
signals.replace(-1, 1, inplace=True)

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_l4.csv")



# Calculate H4-L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx:val_split_idx+232, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[:232, :]

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4-L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

print("(Annual)  Sharpe =", np.sqrt(12) * strategy_returns["Portfolio"].mean()/strategy_returns["Portfolio"].std())

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4_l4.csv")



print("\n\nPanel C : Mar. 2023 - Jan. 2024")

# Calculate H4 return 
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx+232:val_split_idx+464, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[232:464, :]
signals.replace(-1, 0, inplace=True)

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4.csv")



# Calculate L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx+232:val_split_idx+464, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[232:464, :]
signals.replace(1, 0, inplace=True)
signals.replace(-1, 1, inplace=True)

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_l4.csv")



# Calculate H4-L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx+232:val_split_idx+464, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[232:464, :]

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4-L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

print("(Annual)  Sharpe =", np.sqrt(12) * strategy_returns["Portfolio"].mean()/strategy_returns["Portfolio"].std())

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4_l4.csv")



print("\n\nPanel D : Feb. 2024 - Feb. 2025")

# Calculate H4 return 
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx+464:, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[464:, :]
signals.replace(-1, 0, inplace=True)

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4.csv")



# Calculate L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx+464:, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[464:, :]
signals.replace(1, 0, inplace=True)
signals.replace(-1, 1, inplace=True)

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_l4.csv")



# Calculate H4-L4 return
df_ret_org_test = df_ret_original.iloc[220:,:].iloc[val_split_idx+464:, :]

signals = pd.DataFrame(y_pos_pred).copy().iloc[464:, :]

signals.columns = df_ret_org_test.columns
# print(signals.sum())

signals.index = df_ret_org_test.index

strategy_returns = (signals.shift(20) * df_ret_org_test)

strategy_returns["Portfolio_sum"] = strategy_returns.sum(axis=1)  # Aggregate portfolio returns
strategy_returns["Portfolio"] = strategy_returns["Portfolio_sum"] / 4  # Aggregate portfolio returns
strategy_returns["Cumulative Returns"] = (1 + strategy_returns["Portfolio"]).cumprod()

print("H4-L4 strategy")
print("(Monthly)  Avg. Ret. =", strategy_returns["Portfolio"].mean())
print("(Annual) Avg. Ret. =", 12 * strategy_returns["Portfolio"].mean())
print()

print("(Annual)  Sharpe =", np.sqrt(12) * strategy_returns["Portfolio"].mean()/strategy_returns["Portfolio"].std())

# Export
df_result = pd.concat([df_ret_org_test, signals, strategy_returns], axis=1)

base_col_names = df_ret_org_test.columns.tolist()
df_result.columns = base_col_names + \
                    [col[:-4] + "_signal_pred" for col in base_col_names] + \
                    [col[:-4] + "_strat_ret" for col in base_col_names] + \
                    ["Portfolio_sum", "Portfolio", "Cumulative Returns"]

df_result.to_csv("result_test_h4_l4.csv")



Panel B : Mar. 2022 - Feb. 2023
H4 strategy
(Monthly)  Avg. Ret. = 0.011103964396551725
(Annual) Avg. Ret. = 0.1332475727586207

L4 strategy
(Monthly)  Avg. Ret. = -0.023539174429956896
(Annual) Avg. Ret. = -0.28247009315948274

H4-L4 strategy
(Monthly)  Avg. Ret. = 0.03464313882650862
(Annual) Avg. Ret. = 0.41571766591810344

(Annual)  Sharpe = 1.8448880017800633


Panel C : Mar. 2023 - Jan. 2024
H4 strategy
(Monthly)  Avg. Ret. = 0.024177178348060345
(Annual) Avg. Ret. = 0.2901261401767241

L4 strategy
(Monthly)  Avg. Ret. = -0.01738136977047414
(Annual) Avg. Ret. = -0.2085764372456897

H4-L4 strategy
(Monthly)  Avg. Ret. = 0.04155854811853448
(Annual) Avg. Ret. = 0.49870257742241375

(Annual)  Sharpe = 2.825101390025434


Panel D : Feb. 2024 - Feb. 2025
H4 strategy
(Monthly)  Avg. Ret. = 0.033978480136752136
(Annual) Avg. Ret. = 0.4077417616410256

L4 strategy
(Monthly)  Avg. Ret. = 0.0069706876784188035
(Annual) Avg. Ret. = 0.08364825214102564

H4-L4 strategy
(Monthly)  Avg. Ret.

In [14]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""

import torch
import shap
import numpy as np
from tqdm import tqdm

# === 모델 정의 (동일하게 유지) ===
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, num_classes, ffn_dim=128, dropout=0.1):
        super().__init__()
        self.embedding = nn.Linear(input_dim, model_dim)
        self.positional_encoding = PositionalEncoding(model_dim, dropout)
        encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dim_feedforward=ffn_dim, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(model_dim, num_classes)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        x = self.fc(x[:, -1, :])
        return self.softmax(x)

# === 모델 불러오기 ===
INPUT_DIM = 19
MODEL_DIM = 56
NUM_HEADS = 8
NUM_LAYERS = 3
FFN_DIM = 88
NUM_CLASSES = 19

device = torch.device("cpu")
model = TransformerClassifier(INPUT_DIM, MODEL_DIM, NUM_HEADS, NUM_LAYERS, NUM_CLASSES, FFN_DIM, dropout=0.1)
model.load_state_dict(torch.load("transformer_model.pth", map_location=device))
model.to(device)
model.eval()

# === 데이터 불러오기 ===
X_test = torch.load("X_test.pt")  # shape = (T, 12, 19)
X_test = X_test.to(device)
T, SEQ_LEN, INPUT_DIM = X_test.shape
print("X_test shape:", X_test.shape)

# === SHAP Tensor 초기화 ===
shap_tensor = np.zeros((T, NUM_CLASSES, INPUT_DIM))  # (시점, 예측자산, 인풋자산)

# === SHAP 계산 ===
for t in tqdm(range(T), desc="SHAP 계산 중"):
    x_input = X_test[t:t+1].numpy()  # (1, 12, 19)

    for pred_asset in range(NUM_CLASSES):
        # (1) 예측 함수 정의
        
        def predict_fn(x):
            x_tensor = torch.tensor(x.reshape(-1, 12, 19), dtype=torch.float32)  # (1, 228) → (1, 12, 19)
            return model(x_tensor).detach().numpy()[:, pred_asset]

        # (2) 마스커 정의: (1, 12, 19) → (features = 12*19)
        masker = shap.maskers.Independent(data=x_input.reshape(1, -1))  # 2D로 평탄화
        explainer = shap.Explainer(predict_fn, masker, algorithm="permutation")

        # (3) 입력도 flatten해야 맞음
        shap_vals = explainer(x_input.reshape(1, -1))

        # (4) 다시 12x19로 reshape 후 평균 shap 계산
        shap_matrix = shap_vals.values.reshape(12, 19)
        shap_tensor[t, pred_asset, :] = shap_matrix.mean(axis=0)

# === 저장 ===
np.save("shap_tensor_momentum.npy", shap_tensor)
print("SHAP tensor 저장 완료: shap_tensor_momentum.npy")

X_test shape: torch.Size([698, 12, 19])


SHAP 계산 중: 100%|██████████| 698/698 [00:44<00:00, 15.73it/s]

SHAP tensor 저장 완료: shap_tensor_momentum.npy





In [15]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

# === 1. SHAP tensor 로드 ===
shap_tensor = np.load("shap_tensor_momentum.npy")  # shape: (T=698, 19, 19)
T, N_ASSETS, _ = shap_tensor.shape

# === 2. 자산 이름 정의 (예시: 논문 기준) ===
asset_names = ['CL', 'NG', 'HO', 'RB', 'XB', 'C', 'S', 'W', 'BO', 'SM', 
               'SB', 'KC', 'CT', 'LB', 'OJ', 'GC', 'SI', 'HG', 'PL']

# === 3. 전체 평균 및 표준편차 계산 ===
shap_mean = shap_tensor.mean(axis=0)  # shape: (19, 19)
shap_std = shap_tensor.std(axis=0)    # shape: (19, 19)

# === 4. Boxplot (전체 기간) ===
os.makedirs("figs", exist_ok=True)
for i in range(N_ASSETS):  # 예측 대상 자산
    plt.figure(figsize=(12, 4))
    sns.boxplot(data=shap_tensor[:, i, :])
    plt.xticks(ticks=np.arange(N_ASSETS), labels=asset_names, rotation=45)
    plt.title(f"Boxplot of SHAP values (Predicted: {asset_names[i]})")
    plt.ylabel("SHAP value")
    plt.tight_layout()
    plt.savefig(f"figs/boxplot_pred_{asset_names[i]}.png")
    plt.close()

# === 5. Heatmap (전체 평균 SHAP) ===
plt.figure(figsize=(10, 8))
sns.heatmap(shap_mean, xticklabels=asset_names, yticklabels=asset_names, annot=True, fmt=".3f", cmap="coolwarm")
plt.title("Heatmap of SHAP Mean Values (All Periods)")
plt.xlabel("Input Asset")
plt.ylabel("Predicted Asset")
plt.tight_layout()
plt.savefig("figs/heatmap_shap_mean_all.png")
plt.close()

# === 6. 구간 정의 (Panel B, C, D) ===
# Panel B: t = 0~231, Panel C: 232~463, Panel D: 464~
panel_ranges = {
    "full": slice(None),
    "panel_b": slice(0, 232),
    "panel_c": slice(232, 464),
    "panel_d": slice(464, None)
}

# === 7. Boxplot & Heatmap 저장 (각 구간별) ===
for panel_name, panel_slice in panel_ranges.items():
    panel_tensor = shap_tensor[panel_slice, :, :]
    panel_mean = panel_tensor.mean(axis=0)

    # Heatmap
    plt.figure(figsize=(10, 8))
    sns.heatmap(panel_mean, xticklabels=asset_names, yticklabels=asset_names, annot=True, fmt=".3f", cmap="coolwarm")
    plt.title(f"Heatmap of SHAP Mean Values ({panel_name.upper()})")
    plt.xlabel("Input Asset")
    plt.ylabel("Predicted Asset")
    plt.tight_layout()
    plt.savefig(f"figs/heatmap_shap_mean_{panel_name}.png")
    plt.close()

    # Boxplot (1장씩 19개 저장)
    for i in range(N_ASSETS):
        plt.figure(figsize=(12, 4))
        sns.boxplot(data=panel_tensor[:, i, :])
        plt.xticks(ticks=np.arange(N_ASSETS), labels=asset_names, rotation=45)
        plt.title(f"SHAP Boxplot ({panel_name.upper()}): Predicting {asset_names[i]}")
        plt.ylabel("SHAP value")
        plt.tight_layout()
        plt.savefig(f"figs/boxplot_{panel_name}_pred_{asset_names[i]}.png")
        plt.close()

In [1]:
import numpy as np

# Load SHAP tensor
shap_tensor = np.load("shap_tensor_momentum.npy")  # (698, 19, 19)
T, N, _ = shap_tensor.shape

# 전체 평균 및 표준편차 계산
shap_mean = shap_tensor.mean(axis=0)  # (19, 19)
shap_std = shap_tensor.std(axis=0)    # (19, 19)

# 자산 이름 지정
asset_names = ["CL", "NG", "HO", "RB", "XB", "C", "S", "W", "KW", 
               "SB", "LB", "SM", "BO", "FC", "LH", "LC", "OJ", "CC", "JO"]

# 예측 자산 CL 기준으로 input feature SHAP value 평균 상위 5개 출력
pred_asset_idx = asset_names.index("CL")
mean_row = shap_mean[pred_asset_idx]
top5_idx = mean_row.argsort()[::-1][:5]

print(f"\n[전체 기간] CL 예측에 가장 영향을 준 input feature 상위 5:")
for rank, idx in enumerate(top5_idx, 1):
    print(f"{rank}. {asset_names[idx]}: 평균 SHAP = {mean_row[idx]:.6f}, 표준편차 = {shap_std[pred_asset_idx][idx]:.6f}")

# 전체 SHAP 평균 행렬 요약 출력
print("\n전체 기간 평균 SHAP 행렬 (소수점 6자리 요약):")
print(np.round(shap_mean, 6))

# 전체 SHAP 표준편차 행렬 요약 출력
print("\n전체 기간 SHAP 표준편차 행렬 (소수점 6자리 요약):")
print(np.round(shap_std, 6))


[전체 기간] CL 예측에 가장 영향을 준 input feature 상위 5:
1. JO: 평균 SHAP = 0.000000, 표준편차 = 0.000000
2. KW: 평균 SHAP = 0.000000, 표준편차 = 0.000000
3. NG: 평균 SHAP = 0.000000, 표준편차 = 0.000000
4. HO: 평균 SHAP = 0.000000, 표준편차 = 0.000000
5. RB: 평균 SHAP = 0.000000, 표준편차 = 0.000000

전체 기간 평균 SHAP 행렬 (소수점 6자리 요약):
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Load
shap_tensor = np.load("shap_tensor_momentum.npy")  # (T, 19, 19)
T = shap_tensor.shape[0]
asset_names = ["CL", "NG", "HO", "RB", "XB", "C", "S", "W", "KW", 
               "SB", "LB", "SM", "BO", "FC", "LH", "LC", "OJ", "CC", "JO"]

# Create folder
os.makedirs("shap_output", exist_ok=True)

# === 전체 구간 평균/표준편차 ===
shap_mean = shap_tensor.mean(axis=0)  # (19, 19)
shap_std = shap_tensor.std(axis=0)

# 🔷 전체 구간 Heatmap 저장
plt.figure(figsize=(10, 8))
sns.heatmap(shap_mean, xticklabels=asset_names, yticklabels=asset_names, cmap="YlGnBu", annot=False)
plt.title("Heatmap: SHAP Mean (All Period)")
plt.tight_layout()
plt.savefig("shap_output/heatmap_all_period.png")
plt.close()

# 🔷 전체 구간 Boxplot 저장
shap_reshaped = shap_tensor.reshape(-1, 19)
plt.figure(figsize=(14, 6))
sns.boxplot(data=shap_reshaped)
plt.title("Boxplot: SHAP Value (All Period)")
plt.xticks(ticks=np.arange(19), labels=asset_names, rotation=45)
plt.ylabel("SHAP value")
plt.tight_layout()
plt.savefig("shap_output/boxplot_all_period.png")
plt.close()

# === 구간별 (Full / Sub1 / Sub2 / Sub3) ===
periods = {
    "full": (0, T),
    "sub1": (0, 232),
    "sub2": (232, 464),
    "sub3": (464, 698)
}

for label, (start, end) in periods.items():
    period_data = shap_tensor[start:end]
    period_mean = period_data.mean(axis=0)
    
    # Heatmap
    plt.figure(figsize=(10, 8))
    sns.heatmap(period_mean, xticklabels=asset_names, yticklabels=asset_names, cmap="YlOrBr", annot=False)
    plt.title(f"Heatmap: SHAP Mean ({label})")
    plt.tight_layout()
    plt.savefig(f"shap_output/heatmap_{label}.png")
    plt.close()

    # Boxplot
    period_flat = period_data.reshape(-1, 19)
    plt.figure(figsize=(14, 6))
    sns.boxplot(data=period_flat)
    plt.title(f"Boxplot: SHAP Value ({label})")
    plt.xticks(ticks=np.arange(19), labels=asset_names, rotation=45)
    plt.ylabel("SHAP value")
    plt.tight_layout()
    plt.savefig(f"shap_output/boxplot_{label}.png")
    plt.close()

print("✅ 모든 heatmap 및 boxplot 저장 완료 → ./shap_output 폴더 확인")

✅ 모든 heatmap 및 boxplot 저장 완료 → ./shap_output 폴더 확인
