In [2]:
!pip install PyQt5 
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QWidget, QVBoxLayout,
    QPushButton, QFileDialog, QLabel, QTextEdit, QInputDialog
)
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score


class DataProcessor:
    def __init__(self, data=None):
        self.data_path = data
        self.data = None

    def load_data(self):
        if self.data_path is None:
            raise ValueError("No data file path provided.")
        try:
            self.data = pd.read_csv(self.data_path)
        except Exception as e:
            raise IOError(f"Failed to load CSV: {e}")
        print("Data loaded")

    def drop_closed_days(self):
        if self.data is None:
            raise ValueError("Data must be loaded before dropping closed days.")
        if "Open" not in self.data.columns:
            raise KeyError("Required column 'Open' not found in dataset.")
        self.data = self.data[self.data['Open'] == 1]
        print("Closed days dropped")

    def handle_missing_values(self):
        if self.data is None:
            raise ValueError("Data must be loaded before handling missing values.")
        
        self.data.fillna(0, inplace=True)
        print("Missing values filled with 0")

    def extract_features(self, target_column="Sales", n_lags=3):
        if self.data is None:
            raise ValueError("Data must be loaded before extracting features.")
        if target_column not in self.data.columns:
            raise KeyError(f"Target column '{target_column}' not found.")

        for lag in range(1, n_lags + 1):
            self.data[f"{target_column}_lag{lag}"] = self.data[target_column].shift(lag)

        self.data.dropna(inplace=True)

        if "Date" not in self.data.columns:
            raise KeyError("Column 'Date' is required for feature extraction.")

        self.data["Date"] = pd.to_datetime(self.data["Date"], errors="coerce")
        if self.data["Date"].isna().all():
            raise ValueError("All dates failed conversion. Check 'Date' format.")

        self.data["Month"] = self.data["Date"].dt.month
        self.data["DayOfWeek"] = self.data["Date"].dt.dayofweek
        self.data = pd.get_dummies(self.data, columns=["Month", "DayOfWeek"])
        self.data.drop(columns=["Date"], inplace=True)
        print("Features extracted")


class ForecastingModel:
    def __init__(self, data=None, target_column='Sales'):
        self.data = data
        self.target_column = target_column
        self.model = None
        self.X_train = self.X_test = self.y_train = self.y_test = None

    def prepare_data(self, test_size=0.2):
        if self.data is None:
            raise ValueError("No data provided to ForecastingModel.")

        if self.target_column not in self.data.columns:
            raise KeyError(f"Target column '{self.target_column}' not in dataset.")

        if not 0 < test_size < 1:
            raise ValueError("test_size must be between 0 and 1.")

        X = self.data.drop(columns=[self.target_column])
        y = self.data[self.target_column]

        if len(X) < 5:
            raise ValueError("Not enough data to split into train/test sets.")

        split_idx = int(len(X) * (1 - test_size))
        self.X_train, self.X_test = X.iloc[:split_idx], X.iloc[split_idx:]
        self.y_train, self.y_test = y.iloc[:split_idx], y.iloc[split_idx:]

    def train_model(self):
        if self.X_train is None or self.y_train is None:
            raise ValueError("Training data not prepared. Call prepare_data() first.")

        if len(self.X_train) == 0:
            raise ValueError("Training set is empty.")

        self.model = LinearRegression()
        self.model.fit(self.X_train, self.y_train)
        print("Model trained")

    def test_model(self):
        if self.model is None:
            raise ValueError("Model has not been trained yet.")

        if self.X_test is None or len(self.X_test) == 0:
            raise ValueError("Test set is empty.")

        preds = self.model.predict(self.X_test)
        mae = mean_absolute_error(self.y_test, preds)
        rmse = np.sqrt(mean_squared_error(self.y_test, preds))
        r2 = r2_score(self.y_test, preds)
        return mae, rmse, r2, preds

    def forecast(self, new_data):
        if self.model is None:
            raise ValueError("Model must be trained before forecasting.")

        if new_data is None or len(new_data) == 0:
            raise ValueError("No data passed to forecast().")

        forecast_values = self.model.predict(new_data)
        print("Forecast completed.")
        return forecast_values


class ResultsVisualizer:
    def __init__(self, model, X_train, X_test, y_train, y_test, feature_names=None):
        if model is None:
            raise ValueError("A trained model is required for visualization.")
        if X_train is None or X_test is None:
            raise ValueError("Training and test data required for visualization.")

        self.model = model
        self.X_train = X_train
        self.X_test = X_test
        self.y_train = pd.Series(y_train).reset_index(drop=True)
        self.y_test = pd.Series(y_test).reset_index(drop=True)
        self.feature_names = feature_names or [f"x{i}" for i in range(self.X_train.shape[1])]
        self.yhat_train = pd.Series(self.model.predict(self.X_train)).reset_index(drop=True)
        self.yhat_test = pd.Series(self.model.predict(self.X_test)).reset_index(drop=True)
        self.residuals_train = self.y_train - self.yhat_train
        self.residuals_test = self.y_test - self.yhat_test

    def plot_actual_vs_pred(self, canvas, split='test'):
        if (split == 'test' and (self.y_test is None or self.yhat_test is None)) or (
            split == 'train' and (self.y_train is None or self.yhat_train is None)
        ):
            raise ValueError("No prediction data available to plot.")

        canvas.figure.clear()
        y = self.y_test if split == 'test' else self.y_train
        yhat = self.yhat_test if split == 'test' else self.yhat_train
        ax = canvas.figure.add_subplot(111)
        ax.plot(y.values, label="Actual")
        ax.plot(yhat.values, label="Predicted")
        ax.set_title(f"Actual vs Predicted ({split})")
        ax.legend()
        ax.grid(True, linestyle='--', alpha=0.6)
        canvas.draw()

    def plot_residuals(self, canvas, split='test'):
        if (split == 'test' and self.residuals_test is None) or (
            split == 'train' and self.residuals_train is None
        ):
            raise ValueError("No residual data available to plot.")

        canvas.figure.clear()
        res = self.residuals_test if split == 'test' else self.residuals_train
        ax = canvas.figure.add_subplot(111)
        ax.axhline(0, color='k', linewidth=1)
        ax.plot(res.values)
        ax.set_title(f"Residuals ({split})")
        ax.grid(True, linestyle='--', alpha=0.6)
        canvas.draw()

    def plot_error_hist(self, canvas, split='test', bins=30):
        if (split == 'test' and self.residuals_test is None) or (
            split == 'train' and self.residuals_train is None
        ):
            raise ValueError("No residual data available to plot.")

        canvas.figure.clear()
        res = self.residuals_test if split == 'test' else self.residuals_train
        ax = canvas.figure.add_subplot(111)
        ax.hist(res.values, bins=bins, edgecolor='k', alpha=0.7)
        ax.set_title(f"Error Histogram ({split})")
        ax.grid(True, linestyle='--', alpha=0.6)
        canvas.draw()

    def plot_feature_importance(self, canvas, top_n=None):
        if not hasattr(self.model, "coef_"):
            raise ValueError("Model does not expose coefficients for feature importance.")

        values = np.abs(self.model.coef_)
        labels = np.array(self.feature_names)
        order = np.argsort(values)[::-1]
        labels = labels[order]
        values = values[order]
        if top_n:
            labels = labels[:top_n]
            values = values[:top_n]
        canvas.figure.clear()
        ax = canvas.figure.add_subplot(111)
        ax.barh(labels, values)
        ax.set_title("Feature Importance (Coefficients)")
        canvas.draw()


class Logger:
    def __init__(self, text_widget=None):
        self.text_widget = text_widget
        self.messages = []

    def log(self, message):
        self.messages.append(message)
        print(message)
        if self.text_widget:
            self.text_widget.append(message)

    def save_log(self, path):
        with open(path, 'w', encoding='utf-8') as f:
            f.write("\n".join(self.messages))

    def clear_log(self):
        if not self.messages:
            self.log("Error: Log is already empty")
            return
        self.messages.clear()
        if self.text_widget:
            self.text_widget.clear()
        self.log("Log cleared successfully.")


class ForecastGUI(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Sales Forecasting GUI")
        self.setGeometry(100, 100, 1000, 800)

        self.processor = None
        self.model = None
        self.visualizer = None
        self.preds = None
        self.forecast_periods = None

        self.initUI()

    def initUI(self):
        central_widget = QWidget()
        layout = QVBoxLayout()

        self.label = QLabel("No file loaded")
        layout.addWidget(self.label)

        self.load_btn = QPushButton("Load CSV")
        self.load_btn.clicked.connect(self.load_csv)
        layout.addWidget(self.load_btn)

        self.process_btn = QPushButton("Process Data")
        self.process_btn.clicked.connect(self.process_data)
        layout.addWidget(self.process_btn)

        self.train_btn = QPushButton("Train Model")
        self.train_btn.clicked.connect(self.train_model)
        layout.addWidget(self.train_btn)

        self.forecast_btn = QPushButton("Forecast Test Data")
        self.forecast_btn.clicked.connect(self.forecast_future)
        layout.addWidget(self.forecast_btn)

        self.test_btn = QPushButton("Test Model")
        self.test_btn.clicked.connect(self.test_model)
        layout.addWidget(self.test_btn)

        self.plot_actual_btn = QPushButton("Plot Actual vs Predicted")
        self.plot_actual_btn.clicked.connect(self.plot_actual)
        layout.addWidget(self.plot_actual_btn)

        self.plot_residuals_btn = QPushButton("Plot Residuals")
        self.plot_residuals_btn.clicked.connect(self.plot_residuals)
        layout.addWidget(self.plot_residuals_btn)

        self.plot_error_btn = QPushButton("Plot Error Histogram")
        self.plot_error_btn.clicked.connect(self.plot_error_hist)
        layout.addWidget(self.plot_error_btn)

        self.plot_feature_btn = QPushButton("Plot Feature Importance")
        self.plot_feature_btn.clicked.connect(self.plot_feature_importance)
        layout.addWidget(self.plot_feature_btn)

        self.download_forecast_btn = QPushButton("Download Forecast and Plots")
        self.download_forecast_btn.clicked.connect(self.download_forecast)
        layout.addWidget(self.download_forecast_btn)

        self.download_log_btn = QPushButton("Download Log")
        self.download_log_btn.clicked.connect(self.download_log)
        layout.addWidget(self.download_log_btn)

        self.clear_log_btn = QPushButton("Clear Log")
        layout.addWidget(self.clear_log_btn)

        self.log = QTextEdit()
        self.log.setReadOnly(True)
        layout.addWidget(self.log)

        self.logger = Logger(text_widget=self.log)
        self.clear_log_btn.clicked.connect(self.logger.clear_log)

        self.canvas = FigureCanvas(plt.Figure(figsize=(8, 5)))
        layout.addWidget(self.canvas)
        ax = self.canvas.figure.add_subplot(111)
        ax.set_title("Plot Area")
        ax.plot([], [])
        self.canvas.draw()

        central_widget.setLayout(layout)
        self.setCentralWidget(central_widget)

    def load_csv(self):
        try:
            path, _ = QFileDialog.getOpenFileName(self, "Select CSV File", "", "CSV Files (*.csv)")
            if path:
                self.processor = DataProcessor(data=path)
                self.processor.load_data()
                self.label.setText(f"Loaded: {path}")
                self.logger.log("CSV loaded successfully.")
            else:
                self.logger.log("No file selected.")
        except Exception as e:
            self.logger.log(f"Error in load_csv: {e}")

    def process_data(self):
        try:
            if not self.processor:
                raise RuntimeError("No data loaded!")
            self.processor.drop_closed_days()
            self.processor.handle_missing_values()
            self.processor.extract_features(target_column="Sales", n_lags=3)
            self.logger.log("Data processed successfully.")
        except Exception as e:
            self.logger.log(f"Error in process_data: {e}")

    def train_model(self):
        try:
            if not self.processor or self.processor.data is None:
                raise RuntimeError("No data to train on!")
            self.model = ForecastingModel(data=self.processor.data, target_column="Sales")
            self.model.prepare_data()
            self.model.train_model()
            self.visualizer = ResultsVisualizer(
                self.model.model, self.model.X_train, self.model.X_test,
                self.model.y_train, self.model.y_test,
                feature_names=list(self.processor.data.drop(columns=["Sales"]).columns)
            )
            self.logger.log("Model trained and visualizer initialized.")
        except Exception as e:
            self.logger.log(f"Error in train_model: {e}")

    def test_model(self):
        try:
            if not self.model or self.model.model is None:
                raise RuntimeError("No trained model!")
            mae, rmse, r2, self.preds = self.model.test_model()
            self.logger.log(f"Test Results -> MAE: {mae:.4f}, RMSE: {rmse:.4f}, R2: {r2:.4f}")
        except Exception as e:
            self.logger.log(f"Error in test_model: {e}")

    def forecast_future(self):
        try:
            if not self.model or self.model.model is None:
                raise RuntimeError("No trained model to forecast!")

            num_periods, ok = QInputDialog.getInt(
                self, "Forecast Periods", "Enter number of periods to forecast:",
                min=1, max=len(self.model.X_test) if self.model.X_test is not None else 1
            )
            if not ok:
                self.logger.log("Forecast cancelled by user.")
                return

            self.forecast_periods = num_periods
            if self.model.X_test is None or len(self.model.X_test) == 0:
                raise RuntimeError("No test data available to forecast from.")

            X_to_forecast = self.model.X_test.iloc[:num_periods]
            forecast_values = self.model.forecast(X_to_forecast)

            self.visualizer.yhat_test = pd.Series(forecast_values).reset_index(drop=True)
            self.visualizer.y_test = pd.Series(self.model.y_test.iloc[:num_periods]).reset_index(drop=True)
            self.visualizer.residuals_test = self.visualizer.y_test - self.visualizer.yhat_test

            self.logger.log(f"Forecast for {num_periods} period(s):")
            for i, val in enumerate(forecast_values, start=1):
                self.logger.log(f"Period {i}: {val:.2f}")
        except Exception as e:
            self.logger.log(f"Error in forecast_future: {e}")

    def download_forecast(self):
        try:
            if not self.visualizer or self.visualizer.yhat_test is None:
                raise RuntimeError("No forecast data to download!")

            folder = QFileDialog.getExistingDirectory(self, "Select Folder to Save Forecast and Plots")
            if not folder:
                self.logger.log("Download cancelled.")
                return

            df_forecast = pd.DataFrame({"Forecasted Sales": self.visualizer.yhat_test})
            csv_path = f"{folder}/forecast.csv"
            df_forecast.to_csv(csv_path, index=False)
            self.logger.log(f"Forecast saved to {csv_path}")

            plots = {
                "actual_vs_predicted.png": self.visualizer.plot_actual_vs_pred,
                "residuals.png": self.visualizer.plot_residuals,
                "error_histogram.png": self.visualizer.plot_error_hist,
                "feature_importance.png": self.visualizer.plot_feature_importance
            }

            for filename, plot_func in plots.items():
                fig = plt.figure()
                canvas = FigureCanvas(fig)
                try:
                    if "feature_importance" in filename:
                        plot_func(canvas, top_n=20)
                    else:
                        plot_func(canvas)
                    fig.savefig(f"{folder}/{filename}")
                finally:
                    plt.close(fig)
                self.logger.log(f"Plot saved: {filename}")

            self.logger.log("All forecast plots and CSV saved successfully!")
        except Exception as e:
            self.logger.log(f"Error in download_forecast: {e}")

    def download_log(self):
        try:
            if not self.logger.messages:
                raise RuntimeError("No log messages to download!")
            path, _ = QFileDialog.getSaveFileName(self, "Save Log File", "", "Text Files (*.txt)")
            if path:
                if not path.endswith(".txt"):
                    path += ".txt"
                self.logger.save_log(path)
                self.logger.log(f"Log saved successfully to {path}")
            else:
                self.logger.log("Log save cancelled by user.")
        except Exception as e:
            self.logger.log(f"Error in download_log: {e}")

    def plot_actual(self):
        try:
            if self.visualizer:
                self.visualizer.plot_actual_vs_pred(self.canvas)
            else:
                self.logger.log("Train the model first to plot Actual vs Predicted.")
        except Exception as e:
            self.logger.log(f"Error in plot_actual: {e}")

    def plot_residuals(self):
        try:
            if self.visualizer:
                self.visualizer.plot_residuals(self.canvas)
            else:
                self.logger.log("Train the model first to plot Residuals.")
        except Exception as e:
            self.logger.log(f"Error in plot_residuals: {e}")

    def plot_error_hist(self):
        try:
            if self.visualizer:
                self.visualizer.plot_error_hist(self.canvas)
            else:
                self.logger.log("Train the model first to plot Error Histogram.")
        except Exception as e:
            self.logger.log(f"Error in plot_error_hist: {e}")

    def plot_feature_importance(self):
        try:
            if self.visualizer:
                self.visualizer.plot_feature_importance(self.canvas, top_n=20)
            else:
                self.logger.log("Train the model first to plot Feature Importance.")
        except Exception as e:
            self.logger.log(f"Error in plot_feature_importance: {e}")


if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = ForecastGUI()
    window.show()
    sys.exit(app.exec_())



No file selected.
Error in process_data: No data loaded!
Error in train_model: No data to train on!
Data loaded
CSV loaded successfully.
Log cleared successfully.
Error in process_data: "Required column 'Open' not found in dataset."
Error in train_model: "Target column 'Sales' not in dataset."
Error in forecast_future: No trained model to forecast!


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


Error in process_data: No data loaded!
Traceback (most recent call last):
  File "C:\Users\Franky Lee\AppData\Local\Temp\ipykernel_6048\426436218.py", line 344, in process_data
    raise RuntimeError("No data loaded!")
RuntimeError: No data loaded!

Log cleared successfully.
Data loaded
CSV loaded successfully.
Error in process_data: "Required column 'Open' not found in dataset."
Traceback (most recent call last):
  File "C:\Users\Franky Lee\AppData\Local\Temp\ipykernel_6048\426436218.py", line 345, in process_data
    self.processor.drop_closed_days()
  File "C:\Users\Franky Lee\AppData\Local\Temp\ipykernel_6048\426436218.py", line 34, in drop_closed_days
    raise KeyError("Required column 'Open' not found in dataset.")
KeyError: "Required column 'Open' not found in dataset."

Error in train_model: "Target column 'Sales' not in dataset."
Traceback (most recent call last):
  File "C:\Users\Franky Lee\AppData\Local\Temp\ipykernel_6048\426436218.py", line 358, in train_model
    self.mo

SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
