In [None]:
import torch
import torch.nn as nn


# Define the basic block with skip connection
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(residual)
        out = self.relu(out)
        return out

# Define the ResNet model
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(5, 64, kernel_size=3, stride=1, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return torch.tanh(x)

# Create the ResNet-50 model
def resnet50(num_classes):
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes)

In [None]:
import torch.optim as optim
import copy

def train(dataloader_train, dataloader_valid, transform = ''):
    num_class = y_train.shape[1]
    # Instantiate the model
    model = resnet50(num_class).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.0001, weight_decay=0.01)

    num_epochs = 100
    min_val_loss = 1000
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        for batch_x, batch_y in dataloader_train:
            optimizer.zero_grad()
            outputs = model(batch_x)

            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
        model.eval()
        val_loss = []

        with torch.no_grad():
            for batch_x_val, batch_y_val in dataloader_valid:
                outputs_val = model(batch_x_val)
                loss_val = criterion(outputs_val, batch_y_val)
                val_loss.append(loss_val.item())
            if sum(val_loss) < min_val_loss:
                min_val_loss = sum(val_loss)
                best_model = model
                torch.save(model, f'resnet/best_model.pt')

            if epoch % 50 == 0:
                torch.save(model, f'resnet/checkpoint_{epoch}.pt')

            # Print statistics
            print(f'Epoch [{epoch+1}/{num_epochs}]',
                f'Training Loss: {loss.item():.10f}',
                f'Valid Loss: {sum(val_loss)/64:.10f}')

train(dataloader_train, dataloader_valid)

In [None]:
def load_model(mode, device = 'cpu'):
    import torch
    model = torch.load(f'/content/drive/MyDrive/chen_resnet/result/{device}/{mode}_best_model.pt')
    return model
if y_mode == 'dodc':
    dodc_model = load_model('dodc')
elif y_mode == 'do':
    do_model = load_model('do')
    dc_model = load_model('dc')
elif y_mode == '(o-c)':
    dc_model = load_model('dc')
    oc_model = load_model('(o-c)')

In [None]:
import gc
def test(mode, device = 'cpu'):
    gc.collect()
    if mode == 'do':
        model = do_model
        dataloader = dataloader_test_do
    elif mode == 'dc':
        model = dc_model
        dataloader = dataloader_test_dc
    elif mode == '(o-c)':
        model = oc_model
        dataloader = dataloader_test_oc
    elif mode == 'dodc':
        model = dodc_model
        dataloader = dataloader_test

    model.eval()
    s_pred = []
    s_true = []
    for x, y in dataloader:
        y_pred = model(x)
        s_pred.append(y_pred.detach())
        s_true.append(y)
    y_pred_tensor = torch.concat(s_pred)
    y_test_tensor = torch.concat(s_true)
    accuracy = (torch.sign(y_pred_tensor) == torch.sign(y_test_tensor)).sum() / len(y_test_tensor)
    return y_pred_tensor, accuracy

if y_mode == 'dodc':
    y_pred, acc = test(y_mode, device)
elif y_mode == 'do':
    y_pred_do, acc_do = test('do', device)
    y_pred_dc, acc_dc = test('dc', device)
elif y_mode == '(o-c)':
    y_pred_dc, acc_dc = test('dc', device)
    y_pred_oc, acc_oc = test('(o-c)', device)
    print(acc_oc, acc_dc)

In [None]:
# Derive y_pred and y_train_pred of shape(N, 2) and numpy type

if y_mode == 'dodc':
    y_pred_numpy = y_pred.cpu().numpy()

    # predict with train set
    y_train_pred = dodc_model(torch.tensor(X[-100:], dtype = torch.float32))
    y_train_numpy = y_train_pred.detach().cpu().numpy()

elif y_mode == 'dc':
    y_pred_tensor = torch.cat((y_pred_do, y_pred_dc), dim = 1)
    y_pred_numpy = y_pred_tensor.cpu().numpy()

    # predict with train set
    y_trainpred_do = do_model(torch.tensor(X_do[-100:], dtype = torch.float32))
    y_trainpred_dc = dc_model(torch.tensor(X_dc[-100:], dtype = torch.float32))
    y_train_pred = torch.cat((y_trainpred_do, y_trainpred_dc), dim = 1)
    y_train_numpy = y_train_pred.detach().cpu().numpy()

elif y_mode == '(o-c)':
    y_pred_tensor = torch.cat((y_pred_oc, y_pred_dc), dim = 1)
    y_pred_numpy = y_pred_tensor.cpu().numpy()

    # predict with train set
    y_trainpred_oc = oc_model(torch.tensor(X_oc[-100:], dtype = torch.float32))
    y_trainpred_dc = dc_model(torch.tensor(X_dc[-100:], dtype = torch.float32))
    y_train_pred = torch.cat((y_trainpred_oc, y_trainpred_dc), dim = 1)
    y_train_numpy = y_train_pred.detach().cpu().numpy()

In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Scaling
prediction = pd.DataFrame(y_pred_numpy)
scaler = StandardScaler()
scaler.fit(y_train_numpy)
prediction = pd.DataFrame(scaler.transform(prediction))

# Get the predicted price of O and C and Prediction merge with complete data
if y_mode == 'do' or y_mode == 'dodc':
    prediction.columns = ['pred_do_1', 'pred_dc_1']
    prediction['Date'] = date

    true_and_pred = pd.merge(df.reset_index(), prediction, on = 'Date', how = 'left')
    true_and_pred['pred_o'] = (true_and_pred['Open'] * (1 + true_and_pred['pred_do_1'])).shift(1)
    true_and_pred['pred_c'] = (true_and_pred['Close'] * (1 + true_and_pred['pred_dc_1'])).shift(1)
    true_and_pred['pred_oc'] = true_and_pred['pred_c'] - true_and_pred['pred_o']
    true_and_pred['true_oc'] = true_and_pred['Close'] - true_and_pred['Open']

if y_mode == '(o-c)':
    prediction.columns = ['pred_oc_1', 'pred_dc_1']
    prediction['Date'] = date

    true_and_pred = pd.merge(df.reset_index(), prediction, on = 'Date', how = 'left')
    true_and_pred['pred_o'] = (true_and_pred['Close'] * (1 + true_and_pred['pred_oc_1'])).shift(1)
    true_and_pred['pred_c'] = (true_and_pred['Close'] * (1 + true_and_pred['pred_dc_1'])).shift(1)
    true_and_pred['pred_oc'] = true_and_pred['pred_c'] - true_and_pred['pred_o']
    true_and_pred['true_oc'] = true_and_pred['Close'] - true_and_pred['Open']


# Backtest
asset_list = []
df_backtest = true_and_pred[['Open', 'Close', 'true_oc', 'pred_oc']].dropna()
asset = 1
for index, (o, c, true, pred) in df_backtest.iterrows():
    if pred > 0:
        returns = true/o
        asset *= (1 + returns)
    asset_list.append(asset)

print(asset)
plt.plot(asset_list, label = 'resnet')
plt.plot(df_backtest.reset_index()['Close']/df_backtest['Close'].iloc[0], label = 'buy hold')
plt.legend()
# plt.show()