In [None]:
!pip install requests
!pip install torch
import torch
import torch.nn as nn
import numpy as np
import requests
from io import BytesIO
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

class PositionalEncoding(nn.Module):
    """
    Adds positional information to the input sequence.
    This helps the Transformer model understand the order of data points in the sequence.
    """
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class TransformerModel(nn.Module):
    """
    A Transformer model for sequence prediction. This model predicts the next value in a sequence based on past values.
    """
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, nhead=8, max_len=500):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_size, hidden_size)
        self.pos_encoder = PositionalEncoding(hidden_size, max_len=max_len)
        self.transformer = nn.Transformer(d_model=hidden_size, nhead=nhead,
                                          num_encoder_layers=num_layers, num_decoder_layers=num_layers)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, src):
        src = self.embedding(src)
        src = src.permute(1, 0, 2)
        src = self.pos_encoder(src)
        transformer_out = self.transformer(src, src)
        output = self.fc(transformer_out[-1, :, :])
        return output


def load_data_from_github(url):
    """
    Downloads and loads data from a GitHub URL. The data should be in .pt format.
    """
    response = requests.get(url)
    response.raise_for_status()
    data = torch.load(BytesIO(response.content))
    return data


url = "https://github.com/CharlesCLuo/Application-of-AI-in-Supply-Chain-Risk-Management-Series/blob/main/Demand_Forecsting/transformer_data.pt?raw=true"


data = load_data_from_github(url)


sequence1 = data['sequence1']
sequence2 = data['sequence2']


sequence1_df = pd.DataFrame(sequence1.numpy().round(2), columns=['Sequence 1'])
sequence2_df = pd.DataFrame(sequence2.numpy().round(2), columns=['Sequence 2'])


print("First 5 rows of Sequence 1:")
print(sequence1_df.head().to_string(index=False))
print("\nFirst 5 rows of Sequence 2:")
print(sequence2_df.head().to_string(index=False))

print("\nDescriptive Statistics for Sequence 1:")
print(sequence1_df.describe().round(2))
print("\nDescriptive Statistics for Sequence 2:")
print(sequence2_df.describe().round(2))


def split_sequence(sequence, train_ratio=0.7):
    """
    Splits the sequence into training and validation sets based on the train_ratio.
    """
    train_size = int(len(sequence) * train_ratio)
    train_sequence = sequence[:train_size]
    val_sequence = sequence[train_size:]
    return train_sequence, val_sequence

train_seq1, val_seq1 = split_sequence(sequence1, train_ratio=0.7)
train_seq2, val_seq2 = split_sequence(sequence2, train_ratio=0.7)


def create_sliding_window_sequences(sequence, window_size=20):
    """
    Converts a sequence into sliding windows of input-output pairs.
    The input is a sequence of 'window_size' values, and the output is the next value.
    """
    X, y = [], []
    for i in range(len(sequence) - window_size):
        X.append(sequence[i:i + window_size])
        y.append(sequence[i + window_size])
    return torch.stack(X), torch.stack(y)

window_size = 20
X_train_seq1, y_train_seq1 = create_sliding_window_sequences(train_seq1, window_size=window_size)
X_val_seq1, y_val_seq1 = create_sliding_window_sequences(val_seq1, window_size=window_size)

X_train_seq2, y_train_seq2 = create_sliding_window_sequences(train_seq2, window_size=window_size)
X_val_seq2, y_val_seq2 = create_sliding_window_sequences(val_seq2, window_size=window_size)


def train_model(model, X_train, y_train, X_val, y_val, num_epochs=50, lr=0.0001):
    """
    Trains the Transformer model using the training dataset and evaluates on the validation dataset.
    """
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    train_losses, val_losses = [], []

    for epoch in range(num_epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())

        model.eval()
        with torch.no_grad():
            val_outputs = model(X_val)
            val_loss = criterion(val_outputs, y_val)
            val_losses.append(val_loss.item())

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Training Loss: {loss.item():.4f}, Validation Loss: {val_loss.item():.4f}')

    return model, train_losses, val_losses


transformer_model_seq1 = TransformerModel(max_len=500)
trained_model_seq1, train_losses_seq1, val_losses_seq1 = train_model(transformer_model_seq1, X_train_seq1, y_train_seq1, X_val_seq1, y_val_seq1)


transformer_model_seq2 = TransformerModel(max_len=500)
trained_model_seq2, train_losses_seq2, val_losses_seq2 = train_model(transformer_model_seq2, X_train_seq2, y_train_seq2, X_val_seq2, y_val_seq2)


def evaluate_metrics(y_true, y_pred, name):
    """
    Computes and displays evaluation metrics: MSE, MAE, and R² Score.
    """
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    print(f"\n{name} Metrics:")
    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"R² Score: {r2:.4f}")


trained_model_seq1.eval()
with torch.no_grad():
    val_predictions_seq1 = trained_model_seq1(X_val_seq1).squeeze().numpy()
    actual_values_seq1 = y_val_seq1.squeeze().numpy()

plt.figure(figsize=(12, 6))
plt.plot(actual_values_seq1, label="Actual (Validation Sequence 1)", color='black')
plt.plot(val_predictions_seq1, label="Predicted (Validation Sequence 1)", color='red')
plt.title("Transformer Predictions vs Actual Values (Sequence 1)")
plt.xlabel("Sample")
plt.ylabel("Value")
plt.legend()
plt.show()

evaluate_metrics(actual_values_seq1, val_predictions_seq1, "Sequence 1")


trained_model_seq2.eval()
with torch.no_grad():
    val_predictions_seq2 = trained_model_seq2(X_val_seq2).squeeze().numpy()
    actual_values_seq2 = y_val_seq2.squeeze().numpy()

plt.figure(figsize=(12, 6))
plt.plot(actual_values_seq2, label="Actual (Validation Sequence 2)", color='black')
plt.plot(val_predictions_seq2, label="Predicted (Validation Sequence 2)", color='blue')
plt.title("Transformer Predictions vs Actual Values (Sequence 2)")
plt.xlabel("Sample")
plt.ylabel("Value")
plt.legend()
plt.show()

evaluate_metrics(actual_values_seq2, val_predictions_seq2, "Sequence 2")
