In [None]:
import torch
from torch import nn
from torchsummary import summary
import numpy as np
import matplotlib.pyplot as plt
import math
from dataloading import DataParser
from easydict import EasyDict

from indicators import ZigZagNew
from backtest import MovingWindow

In [None]:
def compute_hold(p):
    hold = p if p[-1] - p[0] > 0 else -p
    return hold - hold[0]

In [None]:
class SimpleModel(nn.Module):
    def __init__(self, feature_size, nh):
        self.nh = nh
        self.feature_size = feature_size
        self.inp_size = self.feature_size + 2        
        super(SimpleModel, self).__init__()

        self.fc_features_in = nn.Linear(self.feature_size, nh)
        self.fc_last_state_in = nn.Linear(2, nh)
        self.fc_hid = nn.Linear(nh, nh)
        self.fc_out = nn.Linear(nh, 1)

        self.batch_norm_hid = nn.BatchNorm1d(nh)
        self.batch_norm_out = nn.BatchNorm1d(1)        
        
        self.relu = nn.ReLU()
        self.tanh = nn.Tanh()

    # def forward(self, x):
    #     features, last_pred, last_res = x[:, :-1], x[:, -2:-1], x[:, -1:]
    #     last_state_pred = self.tanh(self.tanh(last_res*10)*last_pred*10)
    #     return last_state_pred
    
    def forward(self, x):
        features, last_state = x[:, :-2], x[:, -2:]

        # last_state = self.relu(self.fc_last_state_in(self.last_state))
        # last_state = self.relu(self.fc_hid(last_state))
        # last_state = self.fc_out(last_state)

        features = self.relu(self.fc_features_in(features))
        features = self.batch_norm_hid(features)
        # features = self.relu(self.fc_hid(features))
        # features = self.batch_norm_hid(features)
        # features = self.relu(self.fc_hid(features))
        # features = self.batch_norm_hid(features)
        # features = self.relu(self.fc_hid(features))
        # features = self.batch_norm_hid(features)
        features = self.fc_out(features)
        
        output = self.tanh(features)
        return output

In [None]:
model = SimpleModel(5, 5)
print(model.inp_size)
model.eval()
model(torch.ones((1, model.inp_size)))

In [None]:
p = np.array([math.sin(x/2)+x/4 for x in range(30)])# + (np.random.random(L)-0.5)*2
dp = p[1:] - p[:-1]
hold = compute_hold(p)
plt.plot(p, ".-")
plt.plot(hold)
plt.plot(dp)

fsize = 3
features = []
for i in range(p.shape[0]):
    if i < fsize-1:
        features.append(np.zeros(fsize))
    else:
        features.append(p[i-fsize+1:i+1] - p[i])

features = torch.from_numpy(np.array(features).astype(np.float32))
for i in range(fsize-1, 10):
    plt.plot(range(i-fsize+1, i+1), features[i].numpy())
plt.grid("on")
plt.tight_layout()

In [None]:
cfg = EasyDict({"data_type": "metatrader", "period": "M60", "ticker": "SBER"})
data_pd, data_np = DataParser(cfg).load()
data_pd.head()

In [None]:
wsize = 256
fsize = 3
zz = ZigZagNew(5)
mw = MovingWindow(data_np, wsize)
last_type = 0
p, features = [], []
# for t in range(wsize*5-1, data_np.Close.shape[0]):
for t in range(1600, data_np.Close.shape[0]): # best - 1500
    hist_window = mw(t)[0]
    zzx, zzp, zzt = zz.update(hist_window)
    zzx -= zzx[0]
    zzp -= zzp[-1]
    # zzx = 127 - zzx
    if zzt[-1] != last_type:
        last_type = zzt[-1]
        # plt.plot(zzx, zzp)
        # plt.plot(hist_window.Low - hist_window.Low[-1])
        # plt.plot(hist_window.High - hist_window.High[-1])
        
        p.append(hist_window.Close[-1])
        features.append(np.vstack([zzp[:fsize]]))
        # features.append(p[-1])
        if len(p) >= 150:
            break
        
features = torch.from_numpy(np.array(features).astype(np.float32))
features = features.squeeze()
p = torch.from_numpy(np.array(p).astype(np.float32))*10
dp = p[1:] - p[:-1]
hold = compute_hold(p)
print(features.shape, dp.shape)

In [None]:
plt.plot(hold)
plt.plot(p - p[0])

In [None]:
for i in range(fsize-1, 20):
    plt.plot(range(i-fsize+1, i+1), features[i].numpy())

In [None]:
features = features.reshape((-1, fsize*3))

In [None]:
model = SimpleModel(features.shape[1], 24)
model.eval()
summary(model, (model.inp_size,), device="cpu")

# model.fc2.weight = nn.Parameter(torch.tensor([[10.]]))
# for param in model.fc2.parameters():
#     param.requires_grad = False

# print(model.fc2.weight)
# print(model.fc2.bias)

In [None]:
model.inp_size

In [None]:
num_epochs = 300
optimizer = torch.optim.SGD(model.parameters(), lr=0.005)

def autoregress_sequense(dp, features, epoch=0, output_sequense=False):
    outputs, pred_results = torch.zeros(dp.shape[0]), torch.zeros(dp.shape[0])
    loss, output, pred_result = 0, torch.zeros((1, 1)), torch.zeros((1, 1))
    for i in range(dp.shape[0]):
        input = torch.hstack([features[i:i+1], output, pred_result])
        output = model(input)
        pred_result = dp[i] * output

        if output_sequense:
            outputs[i] = output
            pred_results[i] = pred_result
        else:
            loss -= pred_result
            print(f"{epoch + 1:03} {i + 1:04}: {loss.item():7.3f} -= {output.item():6.3f} * {dp[i]:6.3f}= {pred_result.item():6.3f}")            
    if output_sequense:
        return outputs.detach().numpy(), pred_results.detach().numpy()
    else:
        return loss, output
        
# Training loop
for epoch in range(num_epochs):
    optimizer.zero_grad()
    loss, _ = autoregress_sequense(dp, features, epoch)
    loss += hold[-1]
    loss.backward()
    optimizer.step()
    print("---------------------------")
    
model.eval()
pred_deals, pred_res = torch.zeros(dp.shape[0]), torch.zeros(dp.shape[0])
pred_deal_last, pred_res_last = 1*torch.zeros((1, 1)), -torch.zeros((1, 1))
# pred_deal_last, pred_res_last = -1*torch.ones((1, 1)), torch.ones((1, 1))

pred_deals, pred_res = autoregress_sequense(dp, features, output_sequense=True)

plt.plot(p - p[0], linewidth=3)
plt.bar(list(range(len(pred_deals))), height=pred_deals, alpha=0.4)
plt.plot(np.hstack([np.zeros(1), pred_res.cumsum(0)]))
plt.grid("on")
plt.tight_layout()
plt.grid("on")

In [None]:
hold[-1], pred_res.sum(0)