# This notebook contains various predictive modeling techniques to forecast bakery daily sales.

In [1]:
# Import necessary libraries
import warnings
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from pmdarima import auto_arima
from sklearn.preprocessing import StandardScaler
from statsmodels.tsa.arima.model import ARIMA  # Updated import
from sklearn.metrics import mean_squared_error
from math import sqrt
import matplotlib.pyplot as plt

In [2]:
%matplotlib inline
warnings.filterwarnings("ignore")

In [3]:
# ARIMA Model
class ArimaModel:
    def __init__(self, train_data, test_data, order=(5, 1, 5)):
        self.train_data = train_data
        self.test_data = test_data
        self.order = order
        self.scaler = StandardScaler()
        self.model = None
        self.pred = None

    def fit(self):
        # Scale the data
        self.train_scaled = self.scaler.fit_transform(self.train_data[['daily_sales']])
        self.test_scaled = self.scaler.transform(self.test_data[['daily_sales']])
        
        # Fit the ARIMA model on the scaled training data
        self.model = ARIMA(self.train_scaled, order=self.order)
        self.model = self.model.fit()

    def predict(self):
        # Predict on the scaled test data
        start = len(self.train_scaled) 
        end = len(self.train_scaled) + len(self.test_scaled) - 1
        pred_scaled = self.model.predict(start=start, end=end, typ='levels')

        # Inverse transform the predictions to get them back to the original scale
        self.pred = self.scaler.inverse_transform(pred_scaled.reshape(-1, 1)).flatten()
        return self.pred

    def plot_predictions(self):
        # Plot the predictions
        plt.figure(figsize=(10, 5))
        plt.plot(self.pred, label='ARIMA Predictions')
        plt.plot(self.test_data['daily_sales'].values, label='Actual Sales')
        plt.legend()
        plt.show()

    def evaluate(self):
        # Calculate RMSE
        rmse = sqrt(mean_squared_error(self.pred, self.test_data['daily_sales'].values))
        return rmse

In [4]:
# ARIMA Model with external data
class ArimaModelWithExternal:
    def __init__(self, train_data, test_data, exog_train, exog_test, order=(5, 1, 5)):
        self.train_data = train_data
        self.test_data = test_data
        self.exog_train = exog_train
        self.exog_test = exog_test
        self.order = order
        self.scaler = StandardScaler()
        self.model = None
        self.pred = None

    def fit(self):
        # Scale the data
        self.train_scaled = self.scaler.fit_transform(self.train_data[['daily_sales']])
        self.test_scaled = self.scaler.transform(self.test_data[['daily_sales']])
        
        # Convert exogenous variables to numeric if necessary
        self.exog_train = pd.to_numeric(self.exog_train)
        self.exog_test = pd.to_numeric(self.exog_test)
        
        # Fit the ARIMA model with external regressors on the scaled training data
        self.model = ARIMA(self.train_scaled, order=self.order, exog=self.exog_train)
        self.model = self.model.fit()
        
    def predict(self):
        # Predict on the scaled test data with external regressors
        start = len(self.train_scaled)
        end = len(self.train_scaled) + len(self.test_scaled) - 1
        pred_scaled = self.model.predict(start=start, end=end, exog=self.exog_test)

        # Inverse transform the predictions to get them back to the original scale
        # Added np.array(pred_scaled) before reshaping
        self.pred = self.scaler.inverse_transform(np.array(pred_scaled).reshape(-1, 1)).flatten()
        return self.pred

    def plot_predictions(self):
        # Plot the predictions
        plt.figure(figsize=(10, 5))
        plt.plot(self.pred, label='ARIMA Predictions')
        plt.plot(self.test_data['daily_sales'].values, label='Actual Sales')
        plt.legend()
        plt.show()

    def evaluate(self):
        # Calculate RMSE
        rmse = sqrt(mean_squared_error(self.pred, self.test_data['daily_sales'].values))
        return rmse

In [5]:
# LSTM Model without external data
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


In [6]:
# LSTM Model with external data
class LSTMModelWithExternal(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(LSTMModelWithExternal, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim + 1, 1)
    
    def forward(self, sales_seq, holiday_seq):
        lstm_out, _ = self.lstm(sales_seq)
        lstm_out = lstm_out[:, -1, :]
        combined = torch.cat((lstm_out, holiday_seq[:, -1].unsqueeze(1)), dim=1)
        output = self.fc(combined)
        return output

In [7]:
# CNN-LSTM Model without external data
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(CNNLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cnn = nn.Conv1d(in_channels=input_size, out_channels=64, kernel_size=3, padding=1)
        self.lstm = nn.LSTM(64, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.cnn(x)
        x = x.permute(0, 2, 1)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [8]:
# CNN-LSTM Model with external data
class CNNLSTMModelWithExternal(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(CNNLSTMModelWithExternal, self).__init__()
        self.cnn = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=3, padding=1)
        self.lstm = nn.LSTM(64, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim + 1, 1)

    def forward(self, sales_seq, holiday_seq):
        sales_seq = sales_seq.permute(0, 2, 1)  # Reshape for CNN
        cnn_out = self.cnn(sales_seq)
        cnn_out = cnn_out.permute(0, 2, 1)  # Reshape back for LSTM
        lstm_out, _ = self.lstm(cnn_out)
        lstm_out = lstm_out[:, -1, :]
        combined = torch.cat((lstm_out, holiday_seq[:, -1].unsqueeze(1)), dim=1)
        output = self.fc(combined)
        return output

In [9]:
# GRU Model without external data
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h0)
        out = self.fc(out[:, -1, :])
        return out

In [10]:
# GRU Model with external data
class GRUModelWithExternal(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(GRUModelWithExternal, self).__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim + 1, 1)

    def forward(self, sales_seq, holiday_seq):
        gru_out, _ = self.gru(sales_seq)
        gru_out = gru_out[:, -1, :]
        combined = torch.cat((gru_out, holiday_seq[:, -1].unsqueeze(1)), dim=1)
        output = self.fc(combined)
        return output

In [11]:
# Transformer Model without external data
class TransformerModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, nhead=4):
        super(TransformerModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Linear(input_size, hidden_size)
        self.positional_encoding = self.generate_positional_encoding(hidden_size, 5000)

        encoder_layers = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)

        self.fc = nn.Linear(hidden_size, output_size)

    def generate_positional_encoding(self, hidden_size, max_len):
        positional_encoding = torch.zeros(max_len, hidden_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
        positional_encoding[:, 0::2] = torch.sin(position * div_term)
        positional_encoding[:, 1::2] = torch.cos(position * div_term)
        return positional_encoding.unsqueeze(0)

    def forward(self, x):
        x = self.embedding(x) + self.positional_encoding[:, :x.size(1), :]
        x = x.permute(1, 0, 2)
        out = self.transformer_encoder(x)
        out = out.permute(1, 0, 2)
        out = self.fc(out[:, -1, :])
        return out