### Import Libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import os, time, datetime
from collections import OrderedDict
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [None]:
from PIL import Image
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

### Data

In [None]:
data = pd.read_csv('../input/BTC_OHLC.csv')

# Moving Average
data['MA5'] = data['Close'].rolling(window = 5).mean()
data['MA20'] = data['Close'].rolling(window = 20).mean()

# Label
data.loc[:, 'Return'] = data['Close'].shift(-1) / data['Close'] - 1

def label(x):
    if x > 0:
        return 1
    else:
        return 0
    
data['label'] = data['Return'].apply(lambda x: label(x))

# Drop NA Rows
data.dropna(inplace = True)

### Charts Construction (Matrix):
1. image (OHLC)
2. image_vol (OHLC + Volume)
3. image_vol_ma (OHLC + Volume + MA)

In [None]:
def image(data):
    num_days = len(data)
    image_height, image_width = 64, 3 * num_days
    mini, maxi = data['Low'].min(), data['High'].max()

    normalized_high = np.interp(data['High'], (mini, maxi), (0, image_height - 1))
    normalized_low = np.interp(data['Low'], (mini, maxi), (0, image_height - 1))
    normalized_open = np.interp(data['Open'], (mini, maxi), (0, image_height - 1))
    normalized_close = np.interp(data['Close'], (mini, maxi), (0, image_height - 1))

    chart_matrix = np.zeros((image_height, image_width), dtype=np.uint8)
    bar_width = max(1, image_width // num_days)

    for idx, (o, h, l, c) in enumerate(zip(normalized_open, normalized_high, normalized_low, normalized_close)):
        x_position = 3 * idx + 1
        open_pixel = image_height - int(o) - 1
        high_pixel = image_height - int(h) - 1
        low_pixel = image_height - int(l) - 1
        close_pixel = image_height - int(c) - 1

        # Open Tick
        chart_matrix[open_pixel, x_position - 1] = 255

        # Close Tick
        chart_matrix[close_pixel, x_position + 1] = 255

        # High-Low Body
        chart_matrix[high_pixel:low_pixel + 1, x_position] = 255

    return chart_matrix

def image_vol(data):
    num_days = len(data)
    image_height, image_width = 64, 3 * num_days

    mini, maxi = data['Low'].min(), data['High'].max()
    normalized_high = np.interp(data['High'], (mini, maxi), (12, image_height - 1))
    normalized_low = np.interp(data['Low'], (mini, maxi), (12, image_height - 1))
    normalized_open = np.interp(data['Open'], (mini, maxi), (12, image_height - 1))
    normalized_close = np.interp(data['Close'], (mini, maxi), (12, image_height - 1))
    vol_mini, vol_maxi = data['Volume'].min(), data['Volume'].max()
    normalized_vol = np.interp(data['Volume'], (vol_mini, vol_maxi), (0, 11))
    chart_matrix = np.zeros((image_height, image_width), dtype=np.uint8)
    bar_width = max(1, image_width // num_days)

    for idx, (o, h, l, c, v) in enumerate(zip(normalized_open, normalized_high, normalized_low, normalized_close, normalized_vol)):
        x_position = 3 * idx + 1
        open_pixel = image_height - int(o) - 1
        high_pixel = image_height - int(h) - 1
        low_pixel = image_height - int(l) - 1
        close_pixel = image_height - int(c) - 1

        # Open Tick
        chart_matrix[open_pixel, x_position-1] = 255

        # Close Tick
        chart_matrix[close_pixel, x_position + 1] = 255

        # High-Low Body
        chart_matrix[high_pixel:low_pixel + 1, x_position] = 255

        # Volume
        chart_matrix[image_height - int(v) - 1:image_height-1, x_position] = 255

    return chart_matrix

def image_vol_ma(data):
    num_days = len(data)
    image_height, image_width = 64, 3 * num_days

    mini, maxi = min(data['Low'].min(), data['MA20'].min()), max(data['MA20'].max(), data['High'].max())
    normalized_high = np.interp(data['High'], (mini, maxi), (12, image_height - 1))
    normalized_low = np.interp(data['Low'], (mini, maxi), (12, image_height - 1))
    normalized_open = np.interp(data['Open'], (mini, maxi), (12, image_height - 1))
    normalized_close = np.interp(data['Close'], (mini, maxi), (12, image_height - 1))
    normalized_ma = np.interp(data['MA20'], (mini, maxi), (12, image_height - 1))
    vol_mini, vol_maxi = data['Volume'].min(), data['Volume'].max()
    normalized_vol = np.interp(data['Volume'], (vol_mini, vol_maxi), (0, 11))

    chart_matrix = np.zeros((image_height, image_width), dtype=np.uint8)
    bar_width = max(1, image_width // num_days)

    for idx, (o, h, l, c, v, ma) in enumerate(zip(normalized_open, normalized_high, normalized_low, normalized_close, normalized_vol, normalized_ma)):
        x_position = 3 * idx + 1
        open_pixel = image_height - int(o) - 1
        high_pixel = image_height - int(h) - 1
        low_pixel = image_height - int(l) - 1
        close_pixel = image_height - int(c) - 1

        # Open Tick
        chart_matrix[open_pixel, x_position-1] = 255

        # Close Tick
        chart_matrix[close_pixel, x_position+1] = 255

        # High-Low Body
        chart_matrix[high_pixel:low_pixel+1, x_position] = 255

        # Volume
        chart_matrix[image_height-int(v)-1:image_height-1, x_position] = 255

        # MA
        chart_matrix[image_height-int(ma)-1, x_position-1:x_position+2] = 255

    return chart_matrix

### train-val-test split

In [None]:
images, label = [], []
for i in range(19, len(data)):
    images.append(image_vol_ma(data.loc[data.index[i-19:i+1], :]))
    label.append(data.loc[data.index[i], 'label'])

In [None]:
train_images, train_label = images[:2515], label[:2515]
X_train, X_val, y_train, y_val = train_test_split(train_images, train_label, test_size=0.2, random_state=1)
X_test, y_test = images[2515:3611], label[2515:3611]

Predict the close price of (2021-01-01 to 2023-12-31) for its position at the closing of (2020-12-31 to 2023-12-30)

### Modelling

In [None]:
class CNN20d(nn.Module):
    # Input: [N, (1), 64, 60]; Output: [N, 2]
    # Three Convolution Blocks
    
    def init_weights(self, m):
        if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
            torch.nn.init.xavier_uniform_(m.weight)
            m.bias.data.fill_(0.01)
    
    def __init__(self):
        super(CNN20d, self).__init__()
        self.conv1 = nn.Sequential(OrderedDict([
            ('Conv', nn.Conv2d(1, 64, (5, 3), padding=(3, 1), stride=(3, 1), dilation=(2, 1))), # output size: [N, 64, 21, 60]
            ('BN', nn.BatchNorm2d(64, affine=True)),
            ('ReLU', nn.ReLU()),
            ('Max-Pool', nn.MaxPool2d((2,1))) # output size: [N, 64, 10, 60]
        ]))
        self.conv1 = self.conv1.apply(self.init_weights)
        
        self.conv2 = nn.Sequential(OrderedDict([
            ('Conv', nn.Conv2d(64, 128, (5, 3), padding=(3, 1), stride=(1, 1), dilation=(1, 1))), # output size: [N, 128, 12, 60]
            ('BN', nn.BatchNorm2d(128, affine=True)),
            ('ReLU', nn.ReLU()),
            ('Max-Pool', nn.MaxPool2d((2,1))) # output size: [N, 128, 6, 60]
        ]))
        self.conv2 = self.conv2.apply(self.init_weights)
        
        self.conv3 = nn.Sequential(OrderedDict([
            ('Conv', nn.Conv2d(128, 256, (5, 3), padding=(2, 1), stride=(1, 1), dilation=(1, 1))), # output size: [N, 256, 6, 60]
            ('BN', nn.BatchNorm2d(256, affine=True)),
            ('ReLU', nn.ReLU()),
            ('Max-Pool', nn.MaxPool2d((2,1))) # output size: [N, 256, 3, 60]
        ]))
        self.conv3 = self.conv3.apply(self.init_weights)

        self.DropOut = nn.Dropout(p=0.5)
        self.FC = nn.Linear(46080, 2)
        self.init_weights(self.FC)
        self.Softmax = nn.Softmax(dim=1)

    def forward(self, x): # input: [N, 64, 60]
        x = x.unsqueeze(1).to(torch.float32)   # output size: [N, 1, 64, 60]
        x = self.conv1(x) # output size: [N, 64, 10, 60]
        x = self.conv2(x) # output size: [N, 128, 6, 60]
        x = self.conv3(x) # output size: [N, 256, 3, 60]
        x = self.DropOut(x.view(x.shape[0], -1))
        x = self.FC(x) # output size: [N, 2]
        x = self.Softmax(x)
        
        return x

In [None]:
# Create the model
net = CNN20d()

# Print the model summary (optional)
print(net)

In [None]:
class MyDataset(Dataset):
    
    def __init__(self, img, label):
        self.img = torch.Tensor(img.copy())
        self.label = torch.Tensor(label)
        self.len = len(img)
  
    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return self.img[idx], self.label[idx]

In [None]:
train_dataset = MyDataset(np.array(X_train), np.array(y_train))
val_dataset = MyDataset(np.array(X_val), np.array(y_val))

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True, pin_memory=True)
val_dataloader = DataLoader(val_dataset, batch_size=256, shuffle=False, pin_memory=True)

In [None]:
def train_loop(dataloader, net, loss_fn, optimizer):
    
    running_loss = 0.0
    current = 0
    net.train()
    
    with tqdm(dataloader) as t:
        for batch, (X, y) in enumerate(t):
            X = X.to(device)
            y = y.to(device)
            y_pred = net(X)
            loss = loss_fn(y_pred, y.long())
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss = (len(X) * loss.item() + running_loss * current) / (len(X) + current)
            current += len(X)
            train_acc = (y_pred.argmax(1) == y.long()).sum().item() / len(y_pred)
            
            t.set_postfix({'Train Loss':running_loss, 'Train Accuracy:':train_acc})
    
    return running_loss

In [None]:
def val_loop(dataloader, net, loss_fn):

    running_loss = 0.0
    current = 0
    net.eval()
    
    with torch.no_grad():
        with tqdm(dataloader) as t:
            for batch, (X, y) in enumerate(t):
                X = X.to(device)
                y = y.to(device)
                y_pred = net(X)
                loss = loss_fn(y_pred, y.long())

                running_loss += loss.item()
                running_loss = (len(X) * running_loss + loss.item() * current) / (len(X) + current)
                current += len(X)
                val_acc = (y_pred.argmax(1) == y.long()).sum().item() / len(y_pred)
                t.set_postfix({'Val Loss':running_loss, 'Val Accuracy:':val_acc})
            
    return running_loss

In [None]:
net = nn.DataParallel(net)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-5)

start_epoch = 0
min_val_loss = 1e9
last_min_ind = -1
early_stopping_epoch = 50

tb = SummaryWriter()

In [None]:
# Create directory to save models
start_time = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
save_dir = f'run/{start_time}'
os.makedirs(save_dir, exist_ok=True)

device = 'cuda'
# os.mkdir('../pt'+os.sep+start_time)
epochs = 50
for t in range(start_epoch, epochs):
    print(f"Epoch {t}:")
    time.sleep(0.2)
    train_loss = train_loop(train_dataloader, net, loss_fn, optimizer)
    val_loss = val_loop(val_dataloader, net, loss_fn)
    tb.add_histogram("train_loss", train_loss, t)
    torch.save(net, os.path.join(save_dir, 'epoch_{}.pt'.format(t))) 
    if val_loss < min_val_loss:
        last_min_ind = t
        min_val_loss = val_loss
    elif t - last_min_ind >= early_stopping_epoch:
        break

print('Done!')
print('Best epoch: {}, val_loss: {}'.format(last_min_ind, min_val_loss))

In [None]:
net_path = f'run/{start_time}/epoch_{last_min_ind}.pt'
final_net = torch.load(net_path)

In [None]:
dataset = MyDataset(np.array(X_test), np.array(y_test))
test_dataloader = DataLoader(dataset, batch_size=2048, shuffle=False)

def eval_loop(dataloader, net, loss_fn):
    
    running_loss = 0.0
    total_loss = 0.0
    current = 0
    net.eval()
    target = []
    predict = []
    with torch.no_grad():
        with tqdm(dataloader) as t:
            for batch, (X, y) in enumerate(t):
                X = X.to(device)
                y = y.to(device)
                y_pred = net(X)
                target.append(y.detach())
                predict.append(y_pred.detach())
                loss = loss_fn(y_pred, y.long())
                
                running_loss = (len(X) * loss.item() + running_loss * current) / (len(X) + current)
                current += len(X)
                t.set_postfix({'running_loss':running_loss})
            
    return total_loss, torch.cat(predict), torch.cat(target)

test_loss, y_pred, y_target = eval_loop(test_dataloader, final_net, loss_fn)

predict_prob = torch.nn.Softmax(dim=1)(y_pred)
predicted_classes = predict_prob.argmax(dim=1).cpu().numpy()

In [None]:
y_true = data[(data['Datetime'] >= '2020-12-31') & (data['Datetime'] <= '2023-12-31')]['Close'].reset_index(drop=True)

In [None]:
position1 = np.where(predicted_classes == 0, -1, predicted_classes)
curr = position1[0]
position2 = [curr]
for i in range(1, len(position1)):
    if curr == 0 or curr == position1[i]:
        curr = position1[i]
    else:
        curr = 0
    position2.append(curr)

In [None]:
val = 1
portf = [1]
for i in range(1, len(y_true)):
    val = val * (1 + position2[i-1] * (y_true[i] - y_true[i-1]) / y_true[i-1])
    portf.append(val)

In [None]:
buy_and_hold = y_true / y_true[0]
dates = pd.date_range(start='2021-01-01', end='2024-01-01', freq='D')

# Create a new figure
plt.figure(figsize=(10, 6))

# Plot the data
plt.plot(dates, buy_and_hold, color='red', label='Buy and Hold')
plt.plot(dates, portf, color='black', label='CNN')

# Set major formatter and locator for the x-axis
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%m/%Y'))
plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=3))

# Add a legend
plt.legend()

# Automatically format x-axis labels for better readability
plt.gcf().autofmt_xdate()

# Show the plot
plt.show()