<a href="https://colab.research.google.com/github/ajwise9/Ai-React-master/blob/main/Copy_of_PriceGPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PriceGPT

Using an autoregressive transformer to predict changes in stock prices instead of  words.

In [None]:
#@title Settings

#@markdown Device to run training and inference on:
device = 'cuda' #@param ['cuda', 'cpu']
#@markdown Stock ticker to use:
stock_ticker = 'SPY' #@param {"type": "string"}
#@markdown How much data to download:
download_bars = 2000 #@param {"type": "integer"}
#@markdown The amount of training examples
training_bars = 1000 #@param {"type": "integer"}
#@markdown How often to retrain:
testing_bars = 100 #@param {"type": "integer"}


In [None]:
#@title Download And Process Data

import yfinance as yf
import numpy as np

yf_df = yf.download(stock_ticker, group_by='ticker')

price = yf_df['Adj Close'].to_numpy()
log_returns = np.diff(np.log(price))

log_returns = log_returns[-download_bars:]

print('downloaded %d bars'%len(log_returns))


def generate_features_and_labels(log_returns, n_tokens):
  features = []
  labels = []

  for feature_start in range(len(log_returns) - n_tokens):
    feature_end = feature_start + n_tokens
    features.append(log_returns[feature_start:feature_end].reshape((-1, 1)).copy())
    labels.append(log_returns[feature_end])

  features = np.array(features)
  labels = np.array(labels)

  return features, labels

In [None]:
#@title Define Model

import torch
import torch.nn as nn
from positional_encodings.torch_encodings import PositionalEncoding1D
from torch.optim import AdamW
from torch.utils.data import TensorDataset, DataLoader

n_features = 1
#@markdown Batch size for training and inference:
batch_size = 1024 #@param {"type": "integer"}
#@markdown How many epochs to train for:
epochs = 500 #@param {"type": "integer"}
#@markdown How many price bars will be visible to the model at a time:
n_tokens = 100 #@param {"type": "integer"}
#@markdown How many dimensions each token vector will have:
d_model = 16 #@param {"type": "integer"}
#@markdown How many heads to split attention into:
n_heads = 1 #@param {"type": "integer"}
#@markdown How many transformer layers to use:
n_layers = 1 #@param {"type": "integer"}
precision = torch.float32

class DecoderLayer(nn.Module):
  def __init__(self, n_tokens, d_model, n_heads):
    super().__init__()

    self.attention_norm = nn.LayerNorm((n_tokens, d_model))
    self.attention = nn.MultiheadAttention(d_model, n_heads, batch_first=True)

    self.mlp_norm = nn.LayerNorm((n_tokens, d_model))
    self.mlp_linear1 = nn.Linear(d_model, d_model * 4)
    self.mlp_activation = nn.GELU()
    self.mlp_linear2 = nn.Linear(d_model * 4, d_model)

  def forward(self, x):
    x_attention_norm = self.attention_norm(x)
    mask = nn.Transformer.generate_square_subsequent_mask(n_tokens, device=x.device)
    x = x + self.attention(x_attention_norm, x_attention_norm, x_attention_norm, attn_mask=mask, is_causal=True)[0]

    x_mlp_norm = self.mlp_norm(x)
    y = self.mlp_linear1(x_mlp_norm)
    y = self.mlp_activation(y)
    y = self.mlp_linear2(y)
    x = x + y

    return x

class AutoregressiveTransformer(nn.Module):
  def __init__(self, n_tokens=n_tokens, d_model=d_model, n_heads=n_heads, n_layers=n_layers):
    super().__init__()

    self.projection = nn.Linear(n_features, d_model)
    self.projection_activation = nn.GELU()
    self.embedding = PositionalEncoding1D(d_model)
    self.norm = nn.LayerNorm(normalized_shape=(n_tokens, d_model))

    decoder_layers = []
    for i in range(n_layers):
      decoder_layers.append(DecoderLayer(n_tokens, d_model, n_heads))
    self.decoder_layers = nn.ModuleList(decoder_layers)

    self.flatten = nn.Flatten()
    self.readout = nn.Linear(n_tokens * d_model, 1)

  def forward(self, x):
    x = self.projection(x)
    x = self.projection_activation(x)
    x = x + self.embedding(x)
    x = self.norm(x)

    for layer in self.decoder_layers:
      x = layer(x)

    x = self.flatten(x)
    x = self.readout(x)

    return x

def make_and_train_model(features, labels, log_losses=False):
  model = AutoregressiveTransformer().to(precision).to(device)
  optim = AdamW(model.parameters(), lr=1e-4)

  features_tensor = torch.tensor(features).to(precision)
  labels_tensor = torch.tensor(labels).to(precision)

  dataset = TensorDataset(features_tensor, labels_tensor)
  dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=True, pin_memory_device=device)

  for epoch in range(epochs):
    losses = []

    for batch_idx, (feature, label) in enumerate(dataloader):
      optim.zero_grad()

      model_input = feature.to(device)

      out = model(model_input)

      loss = torch.mean((out[:, 0] - label.to(device)) ** 2)

      loss.backward()
      optim.step()

      losses.append(loss.cpu().detach().numpy())

    if log_losses:
      print('epoch %d, loss: %.3f' % (epoch, np.mean(losses)))

  return model

def batch_predict(model, features):
  features_tensor = torch.tensor(features)
  predictions = np.array([])

  with torch.no_grad():
    for batch_start in range(0, features_tensor.shape[0], batch_size):
      model_input = features_tensor[batch_start:batch_start + batch_size].to(precision).to(device)

      prediction = model(model_input)
      prediction = prediction[:, 0].cpu().numpy()
      predictions = np.concatenate((predictions, prediction))

  return predictions

def make_and_train_ensemble(n_models, features, labels, log_losses=False):
  models = []
  for i in range(n_models):
    models.append(make_and_train_model(features, labels, log_losses=log_losses))
  return models

def batch_predict_ensemble(models, features):
  result = np.zeros(len(features))
  for model in models:
    result += batch_predict(model, features)
  result /= len(models)
  return result


In [None]:
#@title Sliding Window Backtest

from tqdm.auto import tqdm

#@markdown Log the train loss of each epoch:
log_losses = False #@param {"type": "boolean"}
#@markdown Number of models for ensembling:
n_models = 3 #@param {"type": "integer"}



%pylab inline
from matplotlib import pyplot as plt

all_returns = np.array([])
all_strategy_returns = np.array([])
all_allocations = np.array([])

for train_start in tqdm(range(0, len(log_returns) - training_bars, testing_bars)):
  train_end = train_start + training_bars
  test_start = train_end - n_tokens
  test_end = train_end + testing_bars

  train_features, train_labels = generate_features_and_labels(log_returns[train_start:train_end], n_tokens)

  mean_feature = np.mean(train_features)
  std_feature = np.std(train_features)

  train_features -= mean_feature
  train_features /= std_feature
  train_labels -= mean_feature
  train_labels /= std_feature

  models = make_and_train_ensemble(n_models, train_features, train_labels, log_losses=log_losses)

  train_predictions = batch_predict_ensemble(models, train_features)
  mean_prediction = np.mean(train_predictions)
  std_prediction = np.std(train_predictions)

  test_features, test_labels = generate_features_and_labels(log_returns[test_start:test_end], n_tokens)

  test_returns = np.exp(test_labels) - 1

  test_features -= mean_feature
  test_features /= std_feature
  test_labels -= mean_feature
  test_labels /= std_feature

  # make sure no data is leaking
  # the last label should become a feature when moving on to testing
  assert test_features[0, -1, 0] == train_labels[-1]

  test_predictions = batch_predict_ensemble(models, test_features)

  test_predictions -= mean_prediction
  test_predictions /= std_prediction

  # buy if better than average
  allocation = 1 * (test_predictions > 0)

  # buy if better than anverage, short if worse than average
  # allocation = np.sign(test_predictions)

  # proportional
  # allocation = 0.5 * test_predictions

  strategy_returns = allocation * test_returns

  all_returns = np.concatenate((all_returns, test_returns))
  all_strategy_returns = np.concatenate((all_strategy_returns, strategy_returns))
  all_allocations = np.concatenate((all_allocations, allocation))


plt.plot(np.cumprod(1 + all_returns), label='baseline equity')
plt.plot(np.cumprod(1 + all_strategy_returns), label='strategy equity')
# plt.yscale('log')
plt.legend()
plt.show()

# plt.plot(np.cumprod(1 + all_returns), label='baseline equity')
# plt.plot(np.cumprod(1 + all_strategy_returns / np.std(all_strategy_returns) * np.std(all_returns)), label='strategy equity (matched risk)')
# plt.legend()
# plt.show()

sharpe = np.sqrt(252) * np.mean(all_strategy_returns) / np.std(all_strategy_returns)

print('baseline sharpe ratio:', np.sqrt(252) * np.mean(all_returns) / np.std(all_returns))
print('strategy sharpe ratio:', sharpe)

# p-value testing

beat_by_random = []

for i in range(10000):
  random_order = np.random.permutation(len(all_allocations))
  random_returns = all_returns * all_allocations[random_order]
  random_sharpe = np.sqrt(252) * np.mean(random_returns) / np.std(random_returns)
  beat_by_random.append(random_sharpe > sharpe)

p_value = np.mean(beat_by_random)
print('p-value:', p_value)

In [None]:
#@title Make Live Prediction
#@markdown The output is normalized to the distribution of the model's past predictions.
#@markdown
#@markdown Number of models for ensembling:
n_models = 3 #@param {"type": "integer"}

train_features, train_labels = generate_features_and_labels(log_returns[-training_bars:], n_tokens)

mean_feature = np.mean(train_features)
std_feature = np.std(train_features)

train_features -= mean_feature
train_features /= std_feature
train_labels -= mean_feature
train_labels /= std_feature

models = make_and_train_ensemble(n_models, train_features, train_labels)

train_predictions = batch_predict_ensemble(models, train_features)
mean_prediction = np.mean(train_predictions)
std_prediction = np.std(train_predictions)

live_features = log_returns[-n_tokens:].copy().reshape((1, -1, 1))

live_features -= mean_feature
live_features /= std_feature

live_prediction = batch_predict_ensemble(models, live_features)[0]
live_prediction -= mean_prediction
live_prediction /= std_prediction

print('live prediction (z-score):', live_prediction)