In [177]:
import ccxt
import time
import torch
import torch.nn as nn

In [178]:
# Instantiate the exchange object (replace 'huobi' with your desired exchange)
exchange = ccxt.binance()

# Symbol to fetch market data
symbol = 'BTC/USDT'
timeframe = '1h'
limit = 1000

since = exchange.parse8601('2017-01-01T00:00:00Z')

ohlvc = []
while since < exchange.milliseconds():
    data = exchange.fetch_ohlcv(symbol=symbol, timeframe=timeframe, since=since, limit=limit)
    if not data:
        break
    ohlvc.extend(data)
    since = data[-1][0] + 1 # Move forward in time
    time.sleep(exchange.rateLimit / 1000)
print(len(ohlvc))

68769


In [179]:
# Calculate relevant statistics
# Data is in form: open -- high -- low -- close -- volume
raw_data = torch.tensor(ohlvc)[:, 1:]
open, high, low, close, volume = raw_data[:, 0], raw_data[:, 1], raw_data[:, 2], raw_data[:, 3], raw_data[:, 4]
spread = high - low
diff = close - open
body = torch.abs(diff) / (spread + 1e-12)
log_return = torch.log(close[1:] / close[:-1]) # Different shape --> Discard first elem
log_vol = torch.log(volume + 1e-12)
vol_change = torch.log((volume[1:] + 1e-12) / (volume[:-1] + 1e-12))

In [180]:
N = 10
def window(data: torch.Tensor, op: callable, size: int = N):
    out = []
    for i in range(size, data.shape[0]):
        out.append(op(data[i - size:i]))
    return torch.stack(out)

close_mean = window(close, torch.mean)
close_std = window(close, torch.std)
vol_mean = window(volume, torch.mean)
momentum = close[N:] / close[:-N]
BB_width = 4 * close_std / close_mean

In [None]:
# Trim features
acc_spread = spread[N-1:-1]
acc_diff = diff[N-1:-1]
acc_body = body[N-1:-1]
acc_log_return = log_return[N-2:-1]
acc_log_vol = log_vol[N-1:-1]
acc_vol_change = vol_change[N-2:-1]

# Put into feature list
features = [acc_spread, acc_diff, acc_body, acc_log_return, acc_log_vol, acc_vol_change, close_mean, close_std, vol_mean, momentum, BB_width]

# Build input tensor
d = 15 # Sequence length
xs = torch.stack([f.unfold(0, d, 1) for f in features], dim=2)
print(xs.shape)

# Targets
targets = log_return[N + d - 2:] ** 2
print(targets.shape)

torch.Size([68745, 15, 11])
torch.Size([68745])


In [None]:
class Gate(nn.Module):
    def __init__(self, in_size, hidden_size):
        super().__init__()
        self.in_weight = nn.Parameter(torch.randn(shape=(in_size, hidden_size), requires_grad=True))
        self.hidden_weight = nn.Parameter(torch.randn(shape=(hidden_size, hidden_size), requires_grad=True))
        self.bias = nn.Parameter(torch.randn(shape=(hidden_size), requires_grad=True))
        self.act = torch.sigmoid
        
    def forward(self, x_inp, hidden_inp):
        unact = hidden_inp @ self.hidden_weight + x_inp @ self.in_weight + self.bias
        return self.act(unact)

In [None]:

class Cell(nn.Module):
    def __init__(self, in_size, hidden_size):
        super().__init__()
        self.forget_gate = Gate(in_size, hidden_size)
        self.input_gate = Gate(in_size, hidden_size)
        self.out_gate = Gate(in_size, hidden_size)
        self.inp_weight = nn.Parameter(torch.randn(shape=(in_size, hidden_size), requires_grad=True))
        self.hidden_weight = nn.Parameter(torch.randn(shape=(hidden_size, hidden_size), requires_grad=True))
        self.bias = nn.Parameter(torch.randn(shape=(hidden_size), requires_grad=True))
    
    def forward(self, x_inp, hidden_inp, memory_inp):
        forget_gate = self.forget_gate(x_inp, hidden_inp)
        input_gate = self.input_gate(x_inp, hidden_inp)
        out_gate = self.out_gate(x_inp, hidden_inp)

        candidate_mem = torch.tanh(hidden_inp @ self.hidden_weight + x_inp @ self.inp_weight + self.bias)
        new_mem = forget_gate * memory_inp + input_gate * candidate_mem
        new_output = out_gate * torch.tanh(new_mem)
        return (new_output, new_mem)

In [None]:
# Feature dim is always last dim
class BatchNorm1D(nn.Module):
    def __init__(self, num_features, training=True, momentum=0.9):
        super().__init__()
        self.running_mu = torch.ones((num_features))
        self.running_var = torch.zeros((num_features))
        self.gamma = nn.Parameter(torch.ones((num_features), requires_grad=True))
        self.beta = nn.Parameter(torch.zeros((num_features), requires_grad=True))
        self.training = training
        self.momentum = momentum
    
    def forward(self, inp: torch.Tensor):
        ndims = inp.ndim - 1
        c = self.running_mu.shape[0]
        if self.training:
            mu = torch.mean(inp, dim=(inp.shape[:-1]), keepdim=True)
            var = torch.std(inp, dim=(inp.shape[:-1]), keepdim=True)
        else:
            mu = self.running_mu.view(*([1] * ndims), c)
            mu = self.running_var.view(*([1] * ndims), c)
        
        gamma = self.gamma.view(*([1] * ndims), c)
        beta = self.beta.view(*([1] * ndims), c)

        # Normalize then reparametrize
        x_hat = (inp - mu) / var
        x_new = gamma * x_hat + beta

        # Update running average -- if training
        if self.training:
            self.running_mu = momentum * self.running_mu + (1 - momentum) * mu
            self.running_var = momentum * self.running_var + (1 - momentum) * var
        
        return x_new

In [None]:
class LSTM(nn.Module):
    def __init__(self, in_size, hidden_size, training=True):
        super().__init__()
        self.cell = Cell(in_size, hidden_size)
        if training: # Batch mode
            self.initial_hidden = torch.zeros((1, hidden_size))
            self.initial_mem = torch.zeros((1, hidden_size))
        else:
            self.initial_hidden = torch.zeros(hidden_size)
            self.initial_mem = torch.zeros(hidden_size)
        
        self.in_batchnorm = BatchNorm1D(in_size)
        self.hidden_batchnorm = BatchNorm1D(hidden_size)
        self.training = training
    
    def forward(self, inp: torch.Tensor):
        hidden = self.initial_hidden
        mem = self.initial_mem
        if inp.ndim == 2:
            channels = inp.shape[0]
        elif inp.ndim == 3:
            channels = inp.shape[1]
        else:
            raise ValueError
        
        for t in range(channels):
            
            # Ensure no batchnorm bugs
            if self.training != self.in_batchnorm.training:
                self.in_batchnorm.training = self.training
            if self.training != self.hidden_batchnorm.training:
                self.hidden_batchnorm.training = self.training

            # Deal with dimension cases
            if inp.ndim == 2:
                x = self.in_batchnorm(inp[t, :])
            else:
                x = self.in_batchnorm(inp[:, t, :])

            # Cell update
            hidden = self.hidden_batchnorm(hidden)
            hidden, mem = self.cell(x, hidden, mem)
        return hidden

In [None]:
class MLP(nn.Module):
    def __init__(self, size: tuple, training=True):
        super().__init__()
        self.layers = nn.Sequential()
        for l1, l2 in zip(size, size[1:]):
            self.layers.append(nn.Linear(l1, l2))
            self.layers.append(nn.Tanh())
        self.layers.pop()
        self.training=training
    
    def forward(self, inp):
        return self.layers.forward(inp)

In [None]:
def nll(logits, targets) -> torch.Tensor:
    return 0.5 * torch.log(logits * 2 * torch.pi) + (targets ** 2) / (2 * logits)

In [None]:
class VFNN(nn.Module):
    def __init__(self, hidden_size: tuple, inp_size: tuple, MLP_size: tuple, training=True):
        super().__init__()
        assert MLP_size[0] == hidden_size
        self.blocks = nn.Sequential
        self.blocks.append(LSTM(in_size=inp_size, hidden_size=hidden_size))
        self.blocks.append(MLP(MLP_size))
        self.training = training
    
    def forward(self, inp, targets=None):
        sigma = self.blocks.forward(inp)
        if self.training:
            loss = nll(sigma, targets).mean()
            return sigma, loss
        else:
            return sigma
    
    def set_predict(self):
        self.blocks[0].training = True