In [1]:
# ===========================
# Time Series Forecasting: ARIMA & Holt-Winters
# ===========================
%matplotlib inline
import os
import warnings
warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.holtwinters import ExponentialSmoothing

from sklearn.metrics import mean_absolute_error, mean_squared_error

sns.set(style="whitegrid")

# ---------- helper metrics / funcs ----------
def mae(y_true, y_pred): return mean_absolute_error(y_true, y_pred)
def rmse(y_true, y_pred): return np.sqrt(mean_squared_error(y_true, y_pred))
def mape(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    mask = y_true != 0
    return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100 if mask.sum()>0 else np.nan

def adf_test(series, title=''):
    print(f"\nADF Test: {title}")
    res = adfuller(series.dropna())
    print(f"ADF Statistic: {res[0]:.4f}, p-value: {res[1]:.4f}")
    print("Critical values:")
    for k,v in res[4].items():
        print(f" {k}: {v:.4f}")
    return res

def train_test_split_ts(series, test_periods):
    train = series.iloc[:-test_periods]
    test  = series.iloc[-test_periods:]
    return train, test

def plot_series(train, test=None, pred=None, title="Time Series", xlabel='Date', ylabel='Value'):
    plt.figure(figsize=(12,4))
    plt.plot(train.index, train.values, label='Train')
    if test is not None:
        plt.plot(test.index, test.values, label='Test', color='orange')
    if pred is not None:
        plt.plot(pred.index, pred.values, label='Predicted', color='green')
    plt.title(title); plt.xlabel(xlabel); plt.ylabel(ylabel)
    plt.legend(); plt.show()

# ---------- USER PATH (your path plugged in) ----------
DATA_PATH = r"C:\Users\abhin\Downloads\exchange_rate (2).csv"
FALLBACK = r"/mnt/data/exchange_rate (2).csv"   # environment fallback

# ---------- LOAD DATA ----------
if os.path.exists(DATA_PATH):
    path = DATA_PATH
elif os.path.exists(FALLBACK):
    path = FALLBACK
else:
    raise FileNotFoundError(f"File not found. Update DATA_PATH. Tried:\n{DATA_PATH}\n{FALLBACK}")

# read CSV (or Excel fallback)
try:
    df = pd.read_csv(path)
    print("Loaded CSV:", path)
except Exception:
    df = pd.read_excel(path)
    print("Loaded Excel:", path)

print("Initial shape:", df.shape)
display(df.head())

# ---------- auto-detect date and series column ----------
cols = df.columns.tolist()
print("Columns detected:", cols)
date_col = cols[0]
if len(cols) >= 2:
    series_col = cols[1]
else:
    raise ValueError("Dataset must have at least two columns: date and one series column.")

print("Using date column:", date_col)
print("Using series column:", series_col)

# ---------- parse date and set index ----------
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
if df[date_col].isnull().any():
    print("Warning: some dates could not be parsed (check date format).")
df = df.set_index(date_col).sort_index()

series = pd.to_numeric(df[series_col], errors='coerce')
# try infer freq; if not, leave NaT and use daily freq as fallback
inferred = pd.infer_freq(series.index)
if inferred is None:
    # don't force freq, but we can set daily to allow asfreq if needed
    print("Frequency could not be inferred. Proceeding without explicit freq.")
else:
    series = series.asfreq(inferred)

print("Series info (head):")
display(series.head())
print("Time span:", series.index.min(), "to", series.index.max())
print("Frequency (inferred):", inferred)

# ---------- missing values handling ----------
print("Missing before:", series.isnull().sum())
series_imputed = series.copy().interpolate(method='time').ffill().bfill()
print("Missing after imputation:", series_imputed.isnull().sum())

# ---------- basic plots & stationarity ----------
plt.figure(figsize=(12,4))
plt.plot(series_imputed); plt.title(f"Series: {series_col}"); plt.show()

plt.figure(figsize=(12,4))
plt.plot(series_imputed.rolling(window=12).mean(), label='Rolling mean (12)')
plt.plot(series_imputed.rolling(window=12).std(), label='Rolling std (12)')
plt.legend(); plt.title("Rolling stats"); plt.show()

adf_res = adf_test(series_imputed, title=series_col)

# ACF / PACF plots
fig, ax = plt.subplots(1,2, figsize=(12,4))
plot_acf(series_imputed.dropna(), lags=40, ax=ax[0])
plot_pacf(series_imputed.dropna(), lags=40, ax=ax[1])
plt.show()

# ---------- train-test split ----------
test_periods = 24
if test_periods >= len(series_imputed):
    test_periods = max(1, int(len(series_imputed) * 0.2))
print("Using test_periods =", test_periods)
train, test = train_test_split_ts(series_imputed, test_periods)
plot_series(train, test, title=f"{series_col} â€” Train/Test split")

# ---------- choose ARIMA order ----------
use_auto = False
try:
    import pmdarima as pm
    use_auto = True
except Exception:
    use_auto = False

if use_auto:
    auto = pm.auto_arima(train.dropna(), seasonal=False, stepwise=True, suppress_warnings=True,
                         error_action='ignore', max_p=5, max_q=5, max_d=2)
    print("auto_arima suggestion:", auto.order)
    p,d,q = auto.order
else:
    # quick AIC grid search with small ranges
    best_aic = np.inf
    best_order = (0,0,0)
    # d guess by ADF
    d_guess = 0 if adf_res[1] < 0.05 else 1
    print("ADF p-value:", adf_res[1], "=> trying d =", d_guess)
    for p_try in range(0,4):
        for q_try in range(0,4):
            try:
                mod = SARIMAX(train.dropna(), order=(p_try,d_guess,q_try), enforce_stationarity=False, enforce_invertibility=False)
                res = mod.fit(disp=False)
                if res.aic < best_aic:
                    best_aic = res.aic
                    best_order = (p_try,d_guess,q_try)
            except Exception:
                continue
    p,d,q = best_order
    print("Selected order by AIC grid search:", (p,d,q))

# ---------- fit ARIMA (SARIMAX) ----------
print("Fitting SARIMAX order:", (p,d,q))
model_arima = SARIMAX(train, order=(p,d,q), enforce_stationarity=False, enforce_invertibility=False)
res_arima = model_arima.fit(disp=False)
print(res_arima.summary())

# residual diagnostics
resid = res_arima.resid
plt.figure(figsize=(12,4)); plt.plot(resid); plt.title("ARIMA residuals"); plt.show()
res_arima.plot_diagnostics(figsize=(12,8)); plt.show()

# forecast ARIMA
n_forecast = len(test)
arima_forecast = res_arima.get_forecast(steps=n_forecast)
arima_mean = pd.Series(arima_forecast.predicted_mean, index=test.index)
arima_ci = arima_forecast.conf_int()

plot_series(train, test, arima_mean, title="ARIMA Forecast vs Actual")
plt.fill_between(arima_ci.index, arima_ci.iloc[:,0], arima_ci.iloc[:,1], color='gray', alpha=0.2)

# ---------- Holt-Winters (Exponential Smoothing) ----------
# try to guess seasonality from inferred freq (monthly -> 12)
seasonal_periods = None
if inferred and str(inferred).lower().startswith('m'):
    seasonal_periods = 12
elif inferred and str(inferred).lower().startswith('d'):
    seasonal_periods = 7

print("Seasonal periods guess:", seasonal_periods)
if seasonal_periods and len(train) > 2*seasonal_periods:
    hw = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=seasonal_periods)
else:
    hw = ExponentialSmoothing(train, trend='add', seasonal=None)

res_hw = hw.fit(optimized=True)
hw_pred = pd.Series(res_hw.forecast(n_forecast), index=test.index)

plot_series(train, test, hw_pred, title="Holt-Winters Forecast vs Actual")

# ---------- Evaluation ----------
print("\n--- Evaluation on test set ---")
print("ARIMA Metrics:")
print("MAE:", mae(test, arima_mean))
print("RMSE:", rmse(test, arima_mean))
print("MAPE:", mape(test, arima_mean))

print("\nHolt-Winters Metrics:")
print("MAE:", mae(test, hw_pred))
print("RMSE:", rmse(test, hw_pred))
print("MAPE:", mape(test, hw_pred))

# ---------- Save results ----------
out_df = pd.DataFrame({
    'actual': test,
    'arima_pred': arima_mean,
    'hw_pred': hw_pred
})
out_file = "ts_forecast_results.csv"
out_df.to_csv(out_file, index=True)
print("Saved forecast results to:", os.path.abspath(out_file))

# ---------- Notes ----------
print("\nNotes:")
print("- If pmdarima was available, auto_arima was used to suggest optimal (p,d,q).")
print("- If seasonality is known (e.g., monthly), set seasonal_periods manually.")
print("- To run SARIMA for seasonality, set seasonal_order=(P,D,Q,s) when fitting SARIMAX.")


Loaded CSV: C:\Users\abhin\Downloads\exchange_rate (2).csv
Initial shape: (7588, 2)


Unnamed: 0,date,Ex_rate
0,01-01-1990 00:00,0.7855
1,02-01-1990 00:00,0.7818
2,03-01-1990 00:00,0.7867
3,04-01-1990 00:00,0.786
4,05-01-1990 00:00,0.7849


Columns detected: ['date', 'Ex_rate']
Using date column: date
Using series column: Ex_rate
Frequency could not be inferred. Proceeding without explicit freq.
Series info (head):


date
1990-01-01    0.7855
1990-01-02    0.7500
1990-01-03    0.7471
1990-01-04    0.7587
1990-01-05    0.7852
Name: Ex_rate, dtype: float64

Time span: 1990-01-01 00:00:00 to 2010-12-09 00:00:00
Frequency (inferred): None
Missing before: 0


NotImplementedError: Interpolation with NaNs in the index has not been implemented. Try filling those NaNs before interpolating.