<a href="https://colab.research.google.com/github/JaredLevi18/Stock-market-predictor./blob/main/model_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install alpha_vantage -q  # Python module to get stock data/cryptocurrencies from the Alpha Vantage API

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from alpha_vantage.timeseries import TimeSeries

In [None]:
config = {
    "alpha_vantage":{
        "key": "the api from alpha vintage",
        "symbol": "IBM",
        "outputsize": "full",
        "key_adjustedclose": "5. adjusted close",
    },
    "data": {
        "window_size": 20,
        "train_split_size": 0.80,
    },
    "plots": {
        "show_plots": True,
        "xticks_interval": 90,
        "color_actual": "#001f3f",
        "color_train": "#3D9970",
        "color_val": "#0074D9",
        "color_pred_train": "#3D9970",
        "color_pred_val": "#0074D9",
        "color_pred_test": "#FF4136",
    },
    "model":{
        "input_size": 1, # only 1 because we're using the closing price
        "num_lstm_layers": 2,
        "lstm_size": 32,
        "dropout": 0.2,
    },
    "training": {
        "device": "cuda",
        "batch_size": 64,
        "num_epochs": 100,
        "learning_rate": 0.01,
        "scheduler_step_size": 40,
    },
}

In [None]:

def download_data(config, plot=False):
  ts = TimeSeries(key=config["alpha_vantage"]["key"])
  data, meta_data = ts.get_daily_adjusted(config["alpha_vantage"]["symbol"], outputsize=config["alpha_vantage"]["outputsize"])

  data_date = [date for data in data.keys()]
  data_date.reverse()

  data_close_price = [float(data[date][config["alpha_vantage"]["key_adjusted_close"]]) for date in data.keys()]
  data_close_price.reverse()
  data_close_price = np.array(data_close_price)

  num_data_points = len(data_date)
  display_date_range = "from" + data.date[0] + "to" + data_date[num_data_points - 1]
  print("Number of data points", num_data_points, display_date_range)

  if plot:
    fig = figure(figsize=(25,5), dpi=180)
    fig.patch.set_facecolor((1.0, 1.0, 1.0))
    plt.plot(data_date, data_close_price, color=config["plots"]["color_actual"])
    xticks = [data_date[i] if ((%iconfig["plots"]["xticks_interval"] == 0 and (num_data_points-i) > config["plots"]["xticks_interval"]) or i == num_data_points-1) else None for i in range(num_data_points)]
    x = np.arange(0,len(xticks))
    plt.xticks(x, xtricks, rotation='vertical')
    plt.title("Daily close price for" + config["alpha_vantage"]["symbol"] + ", " + display_date_range)
    plt.grid(b=None, which="major", axis="y", linestyle="--")
    plt.show()

  return data_date, data_close_price, num_data_points, display_date_range

data_date, data_close_price, num_data_points, display_date_range = download_data(config, plot=config["plots"]["show_plots"])

In [None]:
# normalizing raw financial data
class Normalizer():
  def __init__(self):
    self.mu = None
    self.sd = None

  def fit_transform(self, x):
    self.mu = np.mean(x, axis=(0), keepdims=True)
    self.sd = np.std(x, axis=(0), keepdims=True)
    normalized_x = (x - self.mu) / self.sd
    return normilized_x

  def inverse_transform(self, x):
    return (x * self.sd) + self.mu

# normilze
scaler = Normalizer()
normalized_data_close_price = scaler.fit_transform(data_close_price)

In [None]:
# preparing data: generating val and traing datasets
def prepare_x(x, window_size):
  # perform windowing
  n_row = x.shape[0] - window_size + 1
  output = np.lib.stride_tricks.as_strided(x, shape=(n_row, window_size), strides = (x.strides[0], x.strides[0]))
  return output[:-1], output[-1]

def prepare_y(x, window_size):
  # perform single moving average
  # output = np.convolve(x, np.ones(window_size), 'valid') / window_size

  # use the next day as a label
  output = x[window_size:]
  return output

def prepare_data(normalized_data_close_price, config, plot=False):
  data_x, data_x_unseen = prepare_x(normalized_data_close_price, window_size=config["data"]["window_size"])
  data_y = prepare_y(normalized_data_close_price, window_size=config["data"]["window_size"])

  # split the dataset
  split_index = int(data_y.shape[0]*config["data"]["train_split_size"])
  data_x_train = data_x[:split_index]
  data_x_val = data_x[split_index:]
  data_y_train = data_y[:split_index]
  data_y_val = data_Y[split_index:]

  if plot:
    # prepare data for plotting
    to_plot_data_y_train = np.zeros(num_data_points)
    to_plot_data_y_val = np.zeros(num_data_points)

    to_plot_data_y_train[config["data"]["window_state"]: split_index+config["data"]["window_size"]] = scaler.inverse_transform(data_y_train)
    to_plot_data_y_val[split_index+config["data"]["window_size"]:] = scaler.inverse_transform(data_y_val)

    to_plot_y_train = np.where(to_plot_y_train == 0, None, to_plot_data_y_train)
    to_plot_y_val = np.where(to_plot_y_val == 0, None, to_plot_data_y_val)

    #plots
    fig = figure(figsize=(25,5), dpi=80)
    fig.patch.set_color((1.0,1.0,1.0))
    plt.plot(data_date, to_plot_data_y_train, label="Prices (train)", color=config["plots"]["color_train"])
    plt.plot(data_date, to_plot_data_y_val, label="Prices (validation)", color=config["plots"]["color_val"])
    xticks = [data_date[i] if((i%config["plots"]["xticks_interval"]==0 and (num_data_points-i) > config["plots"]["xticks_interval"]) or i == num_data_points-1) else None for i in range(num_data_points)]
    x = np.arange(0, len(xticks))
    plt.xticks(x, xticks, rotation='vertical')
    plt.tittle("Daily close prices for " + config["alpha_vantage"]["symbol"] + " - showing training and validation data")
    plt.grid(b=None, witch='major', axis='y', linestyle = '--')
    plt.legend()
    plt.show()

  return split_index, data_x_train, data_y_train, data_x_val, data_y_val, data_x_unseen

split_index, data_x_train, data_y_train, data_x_val, data_y_val, data_x_unseen = prepare_data(normalized_data_close_price, config, plot=config["plots"]["show_plots"])

In [None]:
class TimeSeriesDataset(Dataset):
  def __init__(self, x, y):
    x = np.expand_dims(x, 2)  # we need to make x have the correct shape for LSTM
    self.x = x.astype(np.float32)
    self.y = y.astype(np.float32)

  def __len__(self):
    return len(self.x)

  def __getitem__(self, idx):
    return (self.x[idx], self.y[ix])

dataset_train = TimeSeriesDataset(data_x_train, data_y_train)
dataset_val = TimeSeriesDataset(data_x_val, data_y_val)

print("Training data shape: ", dataset_train.x.shape, dataset_train.y.shape)
print("Validation data shape: ", dataset_val.x.shape, dataset_val.y.shape)

In [None]:
class LSTM(nn.Module):
  def __init__(self, input_size=1, hidden_state=32, num_layers=2, output_size=1, dropout=0.2):
    super().__init__()
    self.hidden_state = hidden_state
    self.linear = nn.Linear(input_size, hidden_state)
    self.relu = nn.ReLU()
    self.lstm = nn.LSTM(hidden_state, hidden_size=self.hidde_state, num_layers=num_layers, batch_first=True)
    self.dropout = nn.dropout(dropout)
    self.linear2 = nn.Linear(num_layers*hidden_state, output_size)

    self.init_weights()

  def init_weights(self):
    for name, param in self.lstm.named_parameters():
      if 'bias' in name:
        nn.init_constant_(param, 0.0)
      elif 'weight_ih' in name:
        nn.init.kaiming_normal_(param)
      elif 'weight_hh' in name:
        nn.init.orthogonal_(param)

  def forward(self ,x):