<a href="https://colab.research.google.com/github/gianluca-peri/demo-nasdaq/blob/main/demo_architectures.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

Let's consider a simple signal: a sine wave

$$y=\sin(x)$$

In [None]:
# Creating 1D time series

import numpy as np
import matplotlib.pyplot as plt

x = np.linspace(0, 4*np.pi, 100)
y = np.sin(x)

plt.scatter(x, y)
plt.xlabel('x')
plt.ylabel('sin(x)')
plt.title('Outr time series (sine wave)')
plt.show()

In [None]:
# We can think of it as composed by past and future points

x_past = x[:len(x)//2]
y_past = y[:len(x)//2:]

x_future = x[len(x)//2:]
y_future = y[len(x)//2:]

plt.scatter(x_past, y_past, color='blue', label='past')
plt.scatter(x_future, y_future, color='red', label='future')
plt.legend()
plt.xlabel('x')
plt.ylabel('sin(x)')
plt.title('Outr time series (sine wave)')
plt.show()

In [None]:
# We can turn the regression problem into a classification problem

delta = 0.5
last_known_y = y_past[-1]
floor = last_known_y - delta
ceiling = last_known_y + delta

plt.scatter(x_past, y_past, color='blue', label='past')
plt.scatter(x_future, y_future, color='red', label='future')
plt.axhline(y=floor, color='orange', linestyle='--', label='floor')
plt.axhline(y=ceiling, color='green', linestyle='--', label='ceiling')
plt.axhline(y=last_known_y, color='black', linestyle='--', label='current value')
plt.legend()
plt.xlabel('x')
plt.ylabel('sin(x)')
plt.title('Outr time series (sine wave)')
plt.show()


# Creating the data for real

We can consider our sine signal under a noisy channel:

$$y=\sin(x)+\eta$$

In [None]:
# Given a time series (past and future) and a delta we can divide into 3 classes:
# Goes up, goes down, and stay (in the middle)

def classify_time_series(y_past, y_future, delta, verbose=False):
    last_known_y = y_past[-1]
    floor = last_known_y - delta
    ceiling = last_known_y + delta

    if verbose:
      print(f'Last known value: {last_known_y}')
      print(f'Delta: {delta}')
      print(f'Floor: {floor}')
      print(f'Ceiling: {ceiling}\n')

    for point in y_future:
      if point > ceiling:
        return "goes up"
      elif point < floor:
        return "goes down"

    return "neutral"

print(classify_time_series(y_past, y_future, delta, verbose=True))

In [None]:
# Suppose we have a really big historical time series (with noise)

NOISE_STD_DEV = 0.1

lenght_of_time_series = 3000
x = np.linspace(0, 96*np.pi, lenght_of_time_series)
y = np.sin(x) + np.random.normal(0, NOISE_STD_DEV, len(x))

plt.figure(figsize=(24, 6))
plt.plot(x, np.sin(x), color='red', label='sine wave')
plt.scatter(x, y, label='historical time series')
plt.legend()
plt.xlabel('x')
plt.ylabel('y')
plt.title('Our historical time series (sine wave)')
plt.show()


# Turn the data into train, validation, and test datasets


In [None]:
# We can use this, with pytorch, to create a custom dataset for classification

import torch
from torch.utils.data import Dataset, DataLoader

class SineWaveDataset(Dataset):
  def __init__(self, time_series, input_len, future_lookup_len, delta):

    self.time_series = time_series
    self.input_len = input_len
    self.future_lookup_len = future_lookup_len
    self.delta = delta

    self.lenght_of_time_series = len(time_series['x'])
    self.slices = []

    for i in range(self.lenght_of_time_series - input_len - future_lookup_len):
      self.slices.append({
          'x': time_series['x'][i:i+input_len+future_lookup_len],
          'y': time_series['y'][i:i+input_len+future_lookup_len]
      })

  def classify_time_series(self, y_past, y_future):
    last_known_y = y_past[-1]
    floor = last_known_y - self.delta
    ceiling = last_known_y + self.delta

    for point in y_future:
      if point > ceiling:
        # goes up
        return torch.tensor(0)
      elif point < floor:
        # goes down
        return torch.tensor(1)

    # neutral
    return torch.tensor(2)

  def __len__(self):
    return len(self.slices)

  def __getitem__(self, idx):
    current_slice = self.slices[idx]
    y_past = current_slice['y'][:self.input_len]
    y_future = current_slice['y'][self.input_len:]

    input_features_vector = torch.Tensor(y_past)
    correct_output = self.classify_time_series(y_past, y_future)

    return input_features_vector, correct_output

#-------------------------------------------------------------------------------

time_series = {
    'x': x,
    'y': y
}

TIME_DELTA_IN_DATAPOINTS = 10
DELTA = 0.5

dataset = SineWaveDataset(
    time_series,
    input_len=TIME_DELTA_IN_DATAPOINTS,
    future_lookup_len=TIME_DELTA_IN_DATAPOINTS,
    delta=DELTA
    )

# Example: print feature vector and correct output

print(f'Lenght of dataset: {len(dataset)}')

number_of_goes_up_classes = 0
number_of_goes_down_classes = 0
number_of_neutral_classes = 0

for i in range(len(dataset)):
  input_features_vector, correct_output = dataset[i]
  if correct_output == 0:
    number_of_goes_up_classes += 1
  elif correct_output == 1:
    number_of_goes_down_classes += 1
  elif correct_output == 2:
    number_of_neutral_classes += 1
  else:
    raise Exception('Unknown class')

print(f'Number of goes up classes: {number_of_goes_up_classes}')
print(f'Number of goes down classes: {number_of_goes_down_classes}')
print(f'Number of neutral classes: {number_of_neutral_classes}')

print('Example:')

input_features_vector, correct_output = dataset[0]

print('Input features vector:', input_features_vector)
print('Correct output:', correct_output)

In [None]:
# Plot examples of the custom dataset

fig, ax = plt.subplots(2, 3, figsize=(20,10))

for i in range(2):
  for j in range(3):
    k = np.random.randint(0, len(dataset))
    x_slice = dataset.slices[k]['x']
    y_slice = dataset.slices[k]['y']
    x_past = x_slice[:TIME_DELTA_IN_DATAPOINTS]
    y_past = y_slice[:TIME_DELTA_IN_DATAPOINTS]
    x_future = x_slice[TIME_DELTA_IN_DATAPOINTS:]
    y_future = y_slice[TIME_DELTA_IN_DATAPOINTS:]
    last_known_y = y_past[-1]
    floor = last_known_y - DELTA
    ceiling = last_known_y + DELTA
    ax[i][j].scatter(x_past, y_past, color='blue', label='past')
    ax[i][j].scatter(x_future, y_future, color='red', label='future')
    ax[i][j].axhline(y=0, color='black', linestyle='-', label='current value')
    ax[i][j].axhline(y=last_known_y, color='black', linestyle='--', label='current value')
    ax[i][j].axhline(y=floor, color='orange', linestyle='--', label='floor')
    ax[i][j].axhline(y=ceiling, color='green', linestyle='--', label='ceiling')
    ax[i][j].legend()
    ax[i][j].set_xlabel('x')
    ax[i][j].set_ylabel('y')
    ax[i][j].set_title('Our historical time series (sine wave)')

plt.show()

In [None]:
# Split into train and test

train_frac = 0.6
validation_frac = 0.2
test_frac = 0.2

train_size = int(lenght_of_time_series * train_frac)
validation_size = int(lenght_of_time_series * validation_frac)
test_size = int(lenght_of_time_series * test_frac)

print(f'Train size: {train_size}')
print(f'Validation size: {validation_size}')
print(f'Test size: {test_size}')

train_time_series = {
    'x': x[:train_size],
    'y': y[:train_size]
}

validation_time_series = {
    'x': x[train_size:train_size+validation_size],
    'y': y[train_size:train_size+validation_size]
}

test_time_series = {
    'x': x[train_size+validation_size:],
    'y': y[train_size+validation_size:]
}

# Plot train, validation, and test

plt.figure(figsize=(24, 6))
plt.plot(x, np.sin(x), color='red', label='sine wave')
plt.scatter(train_time_series['x'], train_time_series['y'], label='train')
plt.scatter(validation_time_series['x'], validation_time_series['y'], label='validation')
plt.scatter(test_time_series['x'], test_time_series['y'], label='test')
plt.legend()
plt.xlabel('x')
plt.ylabel('y')
plt.title('Our historical time series (sine wave)')
plt.show()

# Build the datasets

train_dataset = SineWaveDataset(train_time_series, TIME_DELTA_IN_DATAPOINTS, TIME_DELTA_IN_DATAPOINTS, DELTA)
validation_dataset = SineWaveDataset(validation_time_series, TIME_DELTA_IN_DATAPOINTS, TIME_DELTA_IN_DATAPOINTS, DELTA)
test_dataset = SineWaveDataset(test_time_series, TIME_DELTA_IN_DATAPOINTS, TIME_DELTA_IN_DATAPOINTS, DELTA)

print(f'Lenght of train dataset: {len(train_dataset)}')
print(f'Lenght of validation dataset: {len(validation_dataset)}')
print(f'Lenght of test dataset: {len(test_dataset)}')

# Build the dataloaders

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)


# Train and test a simple MLP

This is a basic neural network, that needs fixed input lenght of course

In [None]:
!pip3 install torchview 1> /dev/null 2> /dev/null
from torchview import draw_graph
import matplotlib.image as mpimg

# Now we can try to classify using a simple MLP

#-------------------------------------------------------------------------------
# MLP definition

class MLP(torch.nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super().__init__()

    self.sequential = torch.nn.Sequential(
        torch.nn.Linear(input_size, hidden_size),
        torch.nn.ReLU(),
        torch.nn.Linear(hidden_size, output_size),
    )

  def forward(self, x):
    return self.sequential(x)
#-------------------------------------------------------------------------------

input_size = TIME_DELTA_IN_DATAPOINTS
hidden_size = 3000
output_size = 3

model = MLP(input_size, hidden_size, output_size)

# Visualize the network

graph = draw_graph(model, input_size=(1, input_size))

# Render to PNG file
output_path = "model_graph"
graph.visual_graph.render(filename=output_path, format="png", cleanup=True)

# Load the image and display with matplotlib
img = mpimg.imread(f"{output_path}.png")
plt.figure(figsize=(6,6))
plt.imshow(img)
plt.axis('off')
plt.show()


In [None]:
from tqdm.notebook import tqdm

epochs = 200

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.CrossEntropyLoss()

losses_train = []
losses_validation = []

for epoch in tqdm(range(epochs)):
  total_loss = 0
  model.train()
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    optimizer.zero_grad()
    output = model(input_features_vector)
    loss = loss_fn(output, label)
    total_loss += loss.item()
    loss.backward()
    optimizer.step()
  losses_train.append(total_loss/len(train_dataloader))

  # Check validation
  total_loss = 0
  model.eval()
  for batch_idx, (input_features_vector, label) in enumerate(validation_dataloader):
    output = model(input_features_vector)
    loss = loss_fn(output, label)
    total_loss += loss.item()
  losses_validation.append(total_loss/len(validation_dataloader))

# Making a plot of loss vs epochs

plt.plot(losses_train, label='train')
plt.plot(losses_validation, label='validation')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss vs epochs')
plt.show()

In [None]:
# Understand standard shape of model output

model.eval()
with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    output = model(input_features_vector)
    print(output.shape)
    break

In [None]:
# Test the model to get the train accuracy

model.eval()
correct = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.argmax(output[i]) == label[i]:
        correct += 1
      total += 1

print(f'Accuracy on train: {correct/total}')


In [None]:
# Test the model to get the accuracy

model.eval()
correct = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(test_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.argmax(output[i]) == label[i]:
        correct += 1
      total += 1

print(f'Accuracy on test: {correct/total}')



In [None]:
# Test again with a confidence level

confidence = 0.8

model.eval()
correct = 0
wrong = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(test_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.max(output[i]) >= confidence:
        if torch.argmax(output[i]) == label[i]:
          correct += 1
        else:
          wrong += 1
      total += 1

print(f'Accuracy on test with confidence {confidence}: {correct/(correct+wrong)}')
print(f'Activity on test with confidence {confidence}: {(correct+wrong)/total}')

# Let's train and test again, but with a RNN

This will be a recurrent neural network. The main advantage is that it is able to handle sequences with different lenghts.

One of the main points is that we do not need this advantage for our task!

In [None]:
class RNN(torch.nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super().__init__()

    self.rnn = torch.nn.RNN(input_size, hidden_size, num_layers=2, batch_first=True, nonlinearity='relu')

    self.sequential = torch.nn.Sequential(
        torch.nn.Linear(hidden_size, output_size),
    )

  def forward(self, x):
    x = x.unsqueeze(-1)
    h_t, h_f = self.rnn(x)
    x = h_f[-1]
    x = self.sequential(x)
    return x

# Notice that an rnn is adaptable to any input size
# so imput_size in this case is the dimension of the feature vector (=1)
model = RNN(input_size=1, hidden_size=300, output_size=output_size)

# Visualize the network

# To visualize the data lets use the actual size of the time series (input_size)
graph = draw_graph(model, input_size=(1, input_size))

# Render to PNG file
output_path = "model_graph"
graph.visual_graph.render(filename=output_path, format="png", cleanup=True)

# Load the image and display with matplotlib
img = mpimg.imread(f"{output_path}.png")
plt.figure(figsize=(6,6))
plt.imshow(img)
plt.axis('off')
plt.show()


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train and validation

losses_train = []
losses_validation = []

for epoch in tqdm(range(epochs)):
  total_loss = 0
  model.train()
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    optimizer.zero_grad()
    output = model(input_features_vector)
    loss = loss_fn(output, label)
    total_loss += loss.item()
    loss.backward()
    optimizer.step()
  losses_train.append(total_loss/len(train_dataloader))

  # Check validation
  total_loss = 0
  model.eval()
  for batch_idx, (input_features_vector, label) in enumerate(validation_dataloader):
    output = model(input_features_vector)
    loss = loss_fn(output, label)
    total_loss += loss.item()
  losses_validation.append(total_loss/len(validation_dataloader))

# Make a plot of loss vs epochs

plt.plot(losses_train, label='train')
plt.plot(losses_validation, label='validation')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss vs epochs')
plt.show()

In [None]:
# Understand standard shape of model output

model.eval()
with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    output = model(input_features_vector)
    print(output.shape)
    break

In [None]:
# Test the model to get the train accuracy

model.eval()
correct = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.argmax(output[i]) == label[i]:
        correct += 1
      total += 1

print(f'Accuracy on train: {correct/total}')


In [None]:
# Test the model to get the accuracy

model.eval()
correct = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(test_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.argmax(output[i]) == label[i]:
        correct += 1
      total += 1

print(f'Accuracy on test: {correct/total}')



In [None]:
# Test again with a confidence level

confidence = 0.8

model.eval()
correct = 0
wrong = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(test_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.max(output[i]) >= confidence:
        if torch.argmax(output[i]) == label[i]:
          correct += 1
        else:
          wrong += 1
      total += 1

print(f'Accuracy on test with confidence {confidence}: {correct/(correct+wrong)}')
print(f'Activity on test with confidence {confidence}: {(correct+wrong)/total}')

# Train and test again, but with a CNN

This is not able to handle sequences of different lenghts, however it should be able to "see" better the shape of the time series.

In [None]:
# Let's do the same thing but with a CNN

class SimpleCNN(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size, kernel_number=16, kernel_size=2, stride=1, padding=1):
        super().__init__()
        self.conv_net = torch.nn.Sequential(
            torch.nn.Conv1d(1, kernel_number, kernel_size, stride, padding),  # [B, 16, L]
            torch.nn.ReLU()
        )

        self.flattened_size = kernel_number * ((input_size + 2 * padding - kernel_size) // stride + 1)

        self.fc = torch.nn.Sequential(
            torch.nn.Linear(self.flattened_size, hidden_size),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        x = x.unsqueeze(1)  # [B, 1, L]
        x = self.conv_net(x)         # [B, 16, L]
        x = x.view(x.size(0), -1)    # Flatten: [B, 16 * L]
        logits = self.fc(x)          # [B, 3]
        return logits

model = SimpleCNN(input_size, hidden_size, output_size)

# Visualize the network

graph = draw_graph(model, input_size=(1,input_size))

# Render to PNG file
output_path = "model_graph"
graph.visual_graph.render(filename=output_path, format="png", cleanup=True)

# Load the image and display with matplotlib
img = mpimg.imread(f"{output_path}.png")
plt.figure(figsize=(6,20))
plt.imshow(img)
plt.axis('off')
plt.show()


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train and validation

losses_train = []
losses_validation = []

for epoch in tqdm(range(epochs)):
  total_loss = 0
  model.train()
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    optimizer.zero_grad()
    output = model(input_features_vector)
    loss = loss_fn(output, label)
    total_loss += loss.item()
    loss.backward()
    optimizer.step()
  losses_train.append(total_loss/len(train_dataloader))

  # Check validation
  total_loss = 0
  model.eval()
  for batch_idx, (input_features_vector, label) in enumerate(validation_dataloader):
    output = model(input_features_vector)
    loss = loss_fn(output, label)
    total_loss += loss.item()
  losses_validation.append(total_loss/len(validation_dataloader))

# Make a plot of loss vs epochs

plt.plot(losses_train, label='train')
plt.plot(losses_validation, label='validation')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss vs epochs')
plt.show()

In [None]:
# Understand standard shape of model output

model.eval()
with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    output = model(input_features_vector)
    print(output.shape)
    break

In [None]:
# Test the model to get the train accuracy

model.eval()
correct = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(train_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.argmax(output[i]) == label[i]:
        correct += 1
      total += 1

print(f'Accuracy on train: {correct/total}')


In [None]:
# Test the model to get the accuracy

model.eval()
correct = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(test_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.argmax(output[i]) == label[i]:
        correct += 1
      total += 1

print(f'Accuracy on test: {correct/total}')



In [None]:
# Test again with a confidence level

confidence = 0.8

model.eval()
correct = 0
wrong = 0
total = 0

with torch.no_grad():
  for batch_idx, (input_features_vector, label) in enumerate(test_dataloader):
    output = model(input_features_vector)
    for i in range(len(output)):
      if torch.max(output[i]) >= confidence:
        if torch.argmax(output[i]) == label[i]:
          correct += 1
        else:
          wrong += 1
      total += 1

print(f'Accuracy on test with confidence {confidence}: {correct/(correct+wrong)}')
print(f'Activity on test with confidence {confidence}: {(correct+wrong)/total}')