In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import logging
from datetime import datetime

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

class SalesLSTMModel:
    def __init__(self, data_path, window_size=5):
        """
        Initializes the SalesLSTMModel with the dataset and window size for sliding window approach.
        
        :param data_path: The path to the dataset.
        :param window_size: The size of the sliding window for time series forecasting.
        """
        self.data_path = data_path
        self.df = None
        self.model = None
        self.window_size = window_size
        self.scaler = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

    def load_data(self):
        """ Loads the dataset. """
        try:
            logger.info(f"Loading data from {self.data_path}...")
            self.df = pd.read_csv(self.data_path)
            self.df['Date'] = pd.to_datetime(self.df['Date'])
            self.df.set_index('Date', inplace=True)
            logger.info("Data loaded successfully!")
        except Exception as e:
            logger.error(f"Error loading data: {e}")

    def plot_sales(self):
        """ Plot the sales data to check the trend. """
        plt.figure(figsize=(10, 6))
        plt.plot(self.df['Sales'])
        plt.title('Sales Over Time')
        plt.xlabel('Date')
        plt.ylabel('Sales')
        plt.show()

    def test_stationarity(self):
        """ Check stationarity of the sales data using the Dickey-Fuller test. """
        result = adfuller(self.df['Sales'])
        logger.info(f"ADF Statistic: {result[0]}")
        logger.info(f"p-value: {result[1]}")
        if result[1] < 0.05:
            logger.info("The series is stationary")
        else:
            logger.info("The series is non-stationary")

    def preprocess_data(self):
        """ Preprocess the data: handle missing values, scale the data, and prepare for LSTM. """
        logger.info("Preprocessing data...")
        self.df.fillna(0, inplace=True)  # Fill missing values with 0 (or choose another strategy)

        # Use only the 'Sales' column for prediction
        sales_data = self.df['Sales'].values.reshape(-1, 1)

        # Scale the data
        self.scaler = MinMaxScaler(feature_range=(-1, 1))
        scaled_data = self.scaler.fit_transform(sales_data)

        # Convert to supervised learning format
        X, y = self.create_dataset(scaled_data)

        # Reshape for LSTM input (samples, time steps, features)
        X = X.reshape(X.shape[0], X.shape[1], 1)

        # Split into train and test sets
        train_size = int(len(X) * 0.8)
        self.X_train, self.X_test = X[:train_size], X[train_size:]
        self.y_train, self.y_test = y[:train_size], y[train_size:]

        logger.info("Data preprocessing completed.")

    def create_dataset(self, data):
        """ Convert the time series data into a supervised learning format (sliding window). """
        X, y = [], []
        for i in range(len(data) - self.window_size):
            X.append(data[i:i + self.window_size])
            y.append(data[i + self.window_size])
        return np.array(X), np.array(y)

    def build_model(self):
        """ Build the LSTM model. """
        logger.info("Building LSTM model...")
        self.model = Sequential()
        self.model.add(LSTM(units=50, return_sequences=True, input_shape=(self.X_train.shape[1], 1)))
        self.model.add(LSTM(units=50))
        self.model.add(Dense(units=1))

        self.model.compile(optimizer='adam', loss='mean_squared_error')
        logger.info("Model built successfully.")

    def train_model(self):
        """ Train the LSTM model. """
        logger.info("Training the model...")
        self.model.fit(self.X_train, self.y_train, epochs=10, batch_size=32)
        logger.info("Model trained successfully.")

    def evaluate_model(self):
        """ Evaluate the model using test data and plot results. """
        logger.info("Evaluating the model...")

        # Make predictions
        predictions = self.model.predict(self.X_test)

        # Inverse transform the predictions to original scale
        predictions = self.scaler.inverse_transform(predictions)
        y_test_inv = self.scaler.inverse_transform(self.y_test.reshape(-1, 1))

        # Plot the results
        plt.figure(figsize=(10, 6))
        plt.plot(y_test_inv, label='True Sales')
        plt.plot(predictions, label='Predicted Sales')
        plt.title('Sales Prediction with LSTM')
        plt.xlabel('Date')
        plt.ylabel('Sales')
        plt.legend()
        plt.show()
        logger.info("Model evaluation completed.")

    def save_model(self):
        """ Save the trained model with a timestamp. """
        timestamp = datetime.now().strftime("%d-%m-%Y-%H-%M-%S-%f")
        model_filename = f"sales_lstm_model_{timestamp}.h5"
        try:
            logger.info(f"Saving model to {model_filename}...")
            self.model.save(model_filename)
            logger.info("Model saved successfully!")
        except Exception as e:
            logger.error(f"Error saving model: {e}")


# Run the model pipeline
if __name__ == "__main__":
    data_path = os.path.abspath("../primary_data.csv")  # Update with your path
    model = SalesLSTMModel(data_path)

    # Execute the model steps
    model.load_data()
    model.plot_sales()
    model.test_stationarity()
    model.preprocess_data()
    model.build_model()
    model.train_model()
    model.evaluate_model()
    model.save_model()