In [33]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.holtwinters import ExponentialSmoothing

from sklearn.metrics import mean_squared_error
from typing import Optional, Tuple
from pmdarima.arima.utils import ndiffs  # Instead of 'pmdarima.utils'


In [29]:
from abc import ABC, abstractmethod
import pandas as pd

class BaseTimeSeriesModel(ABC):
    def __init__(self, file_path: str, value_col: str, selected_district:str, test_size: float = 0.2):

        self.data = pd.read_csv(file_path)
        self.data = self.data[(self.data['indicator_type'] == 'Total [(A+B) or (C+D)]')]
        self.data['date'] = pd.to_datetime(self.data['date'])
        self.data = self.data.set_index('date')
        self.data.index = pd.DatetimeIndex(self.data.index)
        # self.data = self.data.asfreq('MS')  # 'MS' = Month Start
        self.data = self.data[self.data['district'] == selected_district]
        self.data.sort_index(inplace=True)
        self.series = self.data[value_col]
        self.series = self.series.asfreq('MS')

        self.test_size = test_size
        self.train = None
        self.test = None
        self.model = None
        self.results = {}

    def print_df(self):
        print("First 5 rows:")
        print(self.data.head())
        print(f"\nData shape: {self.series.shape}")
        print(f"\nMissing values:\n{self.data.isnull().sum()}")
        print(f"\nData types:\n{self.data.dtypes}")
        print(f"\nSelected value column: {self.series.name}")

    def split_data(self):
        """Split before any preprocessing"""
        split_idx = int(len(self.series) * (1 - self.test_size))
        self.full_series = self.series.copy()  # Store original
        self.train = self.series[:split_idx]
        self.test = self.series[split_idx:]

    def print_train_test(self):
        print("Train Data:")
        print(self.train.head())
        print(f"Train shape: {self.train.shape}\n")
        print("Test Data:")
        print(self.test.head())
        print(f"Test shape: {self.test.shape}\n")

    @abstractmethod
    def fit_model(self, **kwargs):
        pass

    @abstractmethod
    def forecast(self, steps: int):
        pass

    # def evaluate(self, forecast: pd.Series):
    #     from sklearn.metrics import mean_squared_error
    #     return mean_squared_error(self.test, forecast, squared=False)
    

    def evaluate(self, forecast: pd.Series):
        from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
        
        return {
            'RMSE': mean_squared_error(self.test, forecast, squared=False),
            'MAE': mean_absolute_error(self.test, forecast),
            'MAPE': mean_absolute_percentage_error(self.test, forecast)
        }
    

    def plot_results(self, forecast_series: pd.Series, 
                   lower: Optional[pd.Series] = None, 
                   upper: Optional[pd.Series] = None):
        """
        Visualize training, test and forecast data
        :param forecast_series: Predicted values
        :param lower: Optional lower confidence interval
        :param upper: Optional upper confidence interval
        """
        plt.figure(figsize=(12, 6))
        plt.plot(self.train, label='Training Data')
        plt.plot(self.test, label='Actual Values', alpha=0.7)
        plt.plot(forecast_series, label='Forecast', color='red', linestyle='--')
        
        if lower is not None and upper is not None:
            plt.fill_between(forecast_series.index,
                            lower,
                            upper,
                            color='pink', alpha=0.3)
            
        plt.title(f'{self.__class__.__name__} Forecast Results')
        plt.legend()
        plt.show()

    def diagnostic_plots(self):
        """Generate model diagnostic plots if available"""
        if self.model:
            try:
                return self.model.plot_diagnostics(figsize=(12, 8))
            except AttributeError:
                print(f"{self.__class__.__name__} does not support diagnostic plots")
        else:
            print("Model not trained - call fit_model() first")


In [41]:
class ARIMAModel(BaseTimeSeriesModel):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.best_order = None
        self.d = 0

    def check_stationarity(self, series: pd.Series) -> bool:
        from statsmodels.tsa.stattools import adfuller
        result = adfuller(series.dropna())
        return result[1] <= 0.05
    

    def _auto_diff(self, series: pd.Series) -> int:
        """Determine optimal differencing using training data"""
        from pmdarima.utils import ndiffs
        self._d = ndiffs(series, test='adf')
        return self._d

    def preprocess_data(self):
        current_series = self.series.copy()
        self.d = 0
        while not self.check_stationarity(current_series):
            current_series = current_series.diff().dropna()
            self.d += 1
        self.series = current_series

    def grid_search(self, pdq_range=(2, 1, 2)):
        from statsmodels.tsa.arima.model import ARIMA
        import numpy as np
        best_aic = np.inf
        p_range, d_range, q_range = pdq_range
        for p in range(p_range + 1):
            for d in range(d_range + 1):
                for q in range(q_range + 1):
                    try:
                        model = ARIMA(self.train, order=(p, d, q)).fit()
                        if model.aic < best_aic:
                            best_aic = model.aic
                            self.best_order = (p, d, q)
                    except:
                        continue
        print(f"Best ARIMA order: {self.best_order} (AIC: {best_aic})")

    # def grid_search(self, p_range=3, q_range=3):
    #     """Auto ARIMA implementation with proper differencing"""
    #     from pmdarima import auto_arima
    #     self._auto_diff(self.train)
        
    #     model = auto_arima(
    #         self.train,
    #         d=self._d,
    #         max_p=p_range,
    #         max_q=q_range,
    #         seasonal=False,
    #         trace=True
    #     )
    #     self.best_order = model.order

    def fit_model(self, order=None):
        from statsmodels.tsa.arima.model import ARIMA
        if order is None:
            order = self.best_order
        self.model = ARIMA(self.train, order=order).fit()

    # def forecast(self, steps: int):
    #     return self.model.get_forecast(steps=steps).predicted_mean
    
    # def forecast(self, steps: int) -> Tuple[pd.Series, pd.Series, pd.Series]:
    #     """Returns (mean, lower, upper)"""
    #     forecast = self.model.get_forecast(steps=steps)
    #     return forecast.predicted_mean, forecast.conf_int().iloc[:,0], forecast.conf_int().iloc[:,1]
    

    def forecast(self, steps: int) -> pd.DataFrame:
        """
        Returns a DataFrame with forecast, lower, and upper confidence intervals.
        """
        forecast_res = self.model.get_forecast(steps=steps)
        forecast_df = forecast_res.conf_int()
        forecast_df['forecast'] = forecast_res.predicted_mean
        forecast_df.rename(columns={forecast_df.columns[0]: 'lower', forecast_df.columns[1]: 'upper'}, inplace=True)
        return forecast_df[['forecast', 'lower', 'upper']]


    # def forecast(self, steps: int) -> pd.DataFrame:
    #     """Auto-inverse differencing for proper scaling"""
    #     # Get differenced forecast
    #     diff_forecast = self.model.get_forecast(steps=steps)
        
    #     # Reintegrate forecasts
    #     last_value = self.full_series.iloc[-self._d-1] if self._d > 0 else None
    #     forecast = self._inverse_diff(diff_forecast, last_value)
        
    #     return forecast
    
    def _inverse_diff(self, diff_forecast, last_obs=None):
        """Reverse differencing transformations"""
        if self._d == 0:
            return diff_forecast


In [42]:
class ExpSmoothingModel(BaseTimeSeriesModel):
    def fit_model(self, trend='add', seasonal=None, seasonal_periods=None):
        from statsmodels.tsa.holtwinters import ExponentialSmoothing
        self.model = ExponentialSmoothing(
            self.train,
            trend=trend,
            seasonal=seasonal,
            seasonal_periods=seasonal_periods
        ).fit()

    # def forecast(self, steps: int):
    #     return self.model.forecast(steps)
    def forecast(self, steps: int) -> pd.Series:
        """Returns only point forecasts"""
        return self.model.forecast(steps)


In [43]:
# ARIMA Example
arima = ARIMAModel(   
    file_path="../data/HMIS_DATA_CORRECTED_17_21/mh_dist17_21_with_IDs_date_correction.csv",
    value_col="I1",
    selected_district="THANE",
    test_size=0.2
)



arima.print_df()
arima.preprocess_data()
arima.split_data()
arima.print_train_test()
arima.grid_search(pdq_range=(2, 1, 2))
arima.fit_model()
arima_forecast = arima.forecast(len(arima.test))
arima_rmse = arima.evaluate(arima_forecast)
print(f"ARIMA RMSE: {arima_rmse}")

First 5 rows:
           financial_year   month        state district  \
date                                                      
2017-04-01      2017-2018   April  Maharashtra    THANE   
2017-05-01      2017-2018     May  Maharashtra    THANE   
2017-06-01      2017-2018    June  Maharashtra    THANE   
2017-07-01      2017-2018    July  Maharashtra    THANE   
2017-08-01      2017-2018  August  Maharashtra    THANE   

                    indicator_type  fy_id  mnth_id  st_id  dt_id     I1  ...  \
date                                                                     ...   
2017-04-01  Total [(A+B) or (C+D)]      1        2     22    437  14549  ...   
2017-05-01  Total [(A+B) or (C+D)]      1        3     22    437  13790  ...   
2017-06-01  Total [(A+B) or (C+D)]      1        4     22    437  14818  ...   
2017-07-01  Total [(A+B) or (C+D)]      1        5     22    437  17290  ...   
2017-08-01  Total [(A+B) or (C+D)]      1        6     22    437  14479  ...   

           

  warn('Non-stationary starting autoregressive parameters'
  warn('Non-invertible starting MA parameters found.'
  warn('Non-invertible starting MA parameters found.'


Best ARIMA order: (1, 1, 0) (AIC: 625.2459412757274)




ValueError: y_true and y_pred have different number of output (1!=3)

In [44]:
model = ARIMAModel(   
    file_path="../data/HMIS_DATA_CORRECTED_17_21/mh_dist17_21_with_IDs_date_correction.csv",
    value_col="I1",
    selected_district="THANE",
    test_size=0.2
)
model.split_data()
model.grid_search(pdq_range=(2, 1, 2))
model.fit_model()
forecast = model.forecast(steps=len(model.test))
print(model.evaluate(forecast))
model.plot_results(forecast)

  warn('Non-stationary starting autoregressive parameters'
  warn('Non-invertible starting MA parameters found.'
  warn('Non-invertible starting MA parameters found.'


Best ARIMA order: (1, 1, 0) (AIC: 625.2459412757274)




ValueError: y_true and y_pred have different number of output (1!=3)