In [None]:
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# Load dataset from your directory
file_path = r"D:\DATA SCIENCE\ASSIGNMENTS\20 timeseries\Timeseries\exchange_rate.csv"
exchange_df = pd.read_csv(file_path, parse_dates=[0])

In [None]:
# Display a quick preview
print(exchange_df.head())

In [None]:
# Plot the time series
plt.figure(figsize=(12,6))
plt.plot(exchange_df['date'], exchange_df['Ex_rate'], label='USD to AUD Exchange Rate')
plt.title('Exchange Rate Over Time (USD → AUD)')
plt.xlabel('Date')
plt.ylabel('Exchange Rate')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Part 2: ARIMA modelling
# Save as arima_part2.py or run in a Jupyter cell.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.diagnostic import acorr_ljungbox
import warnings
warnings.filterwarnings("ignore")

In [None]:
# ---------- Load data ----------
file_path = r"D:\DATA SCIENCE\ASSIGNMENTS\20 timeseries\Timeseries\exchange_rate.csv"
df = pd.read_csv(file_path, parse_dates=[0])
df.columns = ['date', 'Ex_rate']   # ensure consistent names
df = df.sort_values('date').set_index('date')

If your data is more granular than monthly, and you want monthly frequency:
df = df.asfreq('D')  # only if truly daily; else don't force frequency

In [None]:
# ---------- Quick plot ----------
plt.figure(figsize=(12,4))
plt.plot(df.index, df['Ex_rate'], label='USD → AUD')
plt.title('USD to AUD Exchange Rate')
plt.xlabel('Date'); plt.ylabel('Exchange Rate'); plt.grid(True); plt.legend()
plt.show()

In [None]:
# ---------- 1) Stationarity check (ADF test) ----------
def adf_report(series, signif=0.05):
    res = adfuller(series.dropna(), autolag='AIC')
    output = {
        'adf_stat': res[0],
        'p_value': res[1],
        'n_lags': res[2],
        'n_obs': res[3],
        'crit_vals': res[4]
    }
    print("ADF Statistic: {:.6f}".format(output['adf_stat']))
    print("p-value: {:.6f}".format(output['p_value']))
    for k, v in output['crit_vals'].items():
        print("Critical Value ({}): {:.6f}".format(k, v))
    if output['p_value'] < signif:
        print("Conclusion: Reject H0 -> series is stationary (at {:.2%} significance).".format(signif))
    else:
        print("Conclusion: Fail to reject H0 -> series is non-stationary (needs differencing).")
    return output

In [None]:
print("\n== ADF test on original series ==")
adf_report(df['Ex_rate'])

In [None]:
# If non-stationary, difference once and test again:
df['diff1'] = df['Ex_rate'].diff()
print("\n== ADF test on first difference ==")
adf_report(df['diff1'].dropna())

In [None]:
# ---------- 2) ACF and PACF to choose p and q ----------
# Plot the ACF and PACF for the (differenced) stationary series
series_for_ac = df['diff1'].dropna() if adfuller(df['Ex_rate'].dropna())[1] > 0.05 else df['Ex_rate']

In [None]:
plt.figure(figsize=(12,4))
plot_acf(series_for_ac, lags=40, zero=False)
plt.title('ACF')
plt.show()

In [None]:
plt.figure(figsize=(12,4))
plot_pacf(series_for_ac, lags=40, method='ywm')  # use ywm or kubo; ywm is robust
plt.title('PACF')
plt.show()

Based on ACF/PACF you pick p and q:
- If PACF cuts off after lag k and ACF tails -> AR(p) with p=k
- If ACF cuts off after lag k and PACF tails -> MA(q) with q=k
- If both tail -> mixed ARMA
We'll pick a few candidate models to try; common approach: try small p/q: 0-3

In [None]:
# ---------- 3) Train-test split ----------
# We'll do a time-series split: last 12 months (or last 10% of samples) for testing
n = len(df)
test_size = int(0.10 * n)     # use 10% for test
train, test = df['Ex_rate'][:-test_size], df['Ex_rate'][-test_size:]
print(f"\nUsing {len(train)} points for training and {len(test)} for testing.")

In [None]:
# ---------- 4) Fit ARIMA models (try several small combinations) ----------
candidate_orders = [(1,1,0), (0,1,1), (1,1,1), (2,1,1), (2,1,0), (0,1,2)]
fitted_models = {}
for order in candidate_orders:
    try:
        m = ARIMA(train, order=order)
        res = m.fit()
        fitted_models[order] = res
        print(f"Fitted ARIMA{order}   AIC: {res.aic:.2f}   BIC: {res.bic:.2f}")
    except Exception as e:
        print(f"ARIMA{order} failed: {e}")

In [None]:
# Choose best by AIC
best_order = min(fitted_models.keys(), key=lambda o: fitted_models[o].aic)
best_res = fitted_models[best_order]
print(f"\nSelected ARIMA{best_order} by AIC (AIC={best_res.aic:.2f})")

In [None]:
# ---------- 5) Diagnostics on chosen model ----------
print("\n=== Model Summary ===")
print(best_res.summary())

In [None]:
# Residual plot
resid = best_res.resid
plt.figure(figsize=(12,4))
plt.plot(resid)
plt.title(f'Residuals of ARIMA{best_order}')
plt.grid(True)
plt.show()

In [None]:
# Residual density + mean
plt.figure(figsize=(8,4))
resid.plot(kind='kde')
plt.title('Residual density')
plt.show()
print("Residual mean:", np.mean(resid), " Residual std:", np.std(resid))

In [None]:
# ACF of residuals
plt.figure(figsize=(10,4))
plot_acf(resid.dropna(), lags=40, zero=False)
plt.title('ACF of residuals')
plt.show()

In [None]:
# Ljung-Box test for no-autocorrelation in residuals
lb = acorr_ljungbox(resid.dropna(), lags=[10, 20], return_df=True)
print("\nLjung-Box test on residuals:\n", lb)

In [None]:
# ---------- 6) Forecasting (out-of-sample) ----------
# Forecast horizon = len(test)
fc = best_res.get_forecast(steps=len(test))
fc_mean = fc.predicted_mean
fc_ci = fc.conf_int(alpha=0.05)

In [None]:
# Combine into DataFrame for plotting
pred_idx = test.index
pred_df = pd.DataFrame({'actual': test, 'forecast': fc_mean.values}, index=pred_idx)
pred_df[['lower', 'upper']] = fc_ci.values

In [None]:
# Plot actual vs forecast
plt.figure(figsize=(12,5))
plt.plot(train.index[-(len(test)*3):], train[-len(test)*3:], label='Train (recent part)')
plt.plot(test.index, test, label='Actual', marker='o')
plt.plot(pred_df.index, pred_df['forecast'], label=f'Forecast ARIMA{best_order}', marker='o')
plt.fill_between(pred_df.index, pred_df['lower'], pred_df['upper'], color='gray', alpha=0.2, label='95% CI')
plt.title('ARIMA Forecast vs Actual')
plt.xlabel('Date'); plt.ylabel('Exchange Rate'); plt.legend(); plt.grid(True)
plt.show()

In [None]:
# Simple numeric metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error
rmse = np.sqrt(mean_squared_error(pred_df['actual'], pred_df['forecast']))
mae = mean_absolute_error(pred_df['actual'], pred_df['forecast'])
mape = np.mean(np.abs((pred_df['actual'] - pred_df['forecast']) / pred_df['actual'])) * 100
print(f"Forecast metrics on test set: RMSE={rmse:.6f}, MAE={mae:.6f}, MAPE={mape:.2f}%")

Save the best model if desired
best_res.save("best_arima_model.pkl")

In [None]:
"""
es_param_optimization.py

Parameter optimization for Exponential Smoothing models (SES, Holt, Holt-Winters)
- Uses AIC to pick best parameters from a grid
- Falls back to statsmodels automatic optimization if grid is disabled or fails

Dependencies:
  pip install pandas numpy matplotlib statsmodels scikit-learn
"""

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from itertools import product
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.metrics import mean_absolute_error, mean_squared_error

In [None]:
def infer_seasonal_period(ts, max_period=24):
    """
    Try to infer a seasonal period by autocorrelation peak.
    Simple heuristic: look for the lag (<= max_period) with the highest AC at lag>0.
    Returns an int seasonality or None.
    """
    from statsmodels.tsa.stattools import acf
    acfs = acf(ts.dropna(), nlags=min(len(ts)//2, max_period), fft=True)
    # ignore lag 0
    if len(acfs) < 2:
        return None
    lag = int(np.argmax(acfs[1:]) + 1)
    if acfs[lag] > 0.3:  # threshold heuristic; lower for noisy data
        return lag
    return None

In [None]:
def grid_search_es(ts,
                   model_type='auto',  # 'ses', 'holt', 'hw' or 'auto'
                   seasonal_periods=None,
                   alphas=None, betas=None, gammas=None,
                   use_grid=True,
                   max_combinations=200):
    """
    Grid-search AIC for ExponentialSmoothing models.
    Returns (best_fit, best_params, results_df)
    """
    ts = ts.dropna()
    n = len(ts)
    results = []

    # Decide which model to run
    if model_type == 'auto':
        # try to detect seasonality
        if seasonal_periods is None:
            seasonal_periods = infer_seasonal_period(ts)
        if seasonal_periods and seasonal_periods >= 2:
            chosen = 'hw'  # Holt-Winters
        else:
            # check for linear trend via simple difference of means slope
            slope = (ts.iloc[-1] - ts.iloc[0]) / max(n-1, 1)
            # if slope magnitude is significant relative to series std -> trend
            if abs(slope) > 0.1 * np.std(ts):
                chosen = 'holt'
            else:
                chosen = 'ses'
    else:
        chosen = model_type

    print(f"Chosen model: {chosen}, seasonal_periods={seasonal_periods}")

    # Default parameter grids
    if alphas is None:
        alphas = np.linspace(0.01, 0.99, 9)
    if betas is None:
        betas = np.linspace(0.01, 0.99, 7)
    if gammas is None:
        gammas = np.linspace(0.01, 0.99, 7)

    # Build candidate list
    candidates = []

    if chosen == 'ses':
        for a in alphas:
            candidates.append({'smoothing_level': float(a)})
    elif chosen == 'holt':
        for a, b in product(alphas, betas):
            candidates.append({'smoothing_level': float(a), 'smoothing_slope': float(b)})
    else:  # hw
        if seasonal_periods is None:
            raise ValueError("seasonal_periods must be provided or inferable for Holt-Winters")
        for a, b, g in product(alphas, betas, gammas):
            candidates.append({'smoothing_level': float(a),
                               'smoothing_slope': float(b),
                               'smoothing_seasonal': float(g)})

    # if too many candidates, reduce by sampling
    if use_grid and len(candidates) > max_combinations:
        np.random.seed(0)
        candidates = list(np.random.choice(candidates, size=max_combinations, replace=False))

    best_aic = np.inf
    best_fit = None
    best_params = None

    if not use_grid:
        # Let statsmodels optimize
        print("Grid disabled — using statsmodels optimized=True")
        try:
            if chosen == 'ses':
                model = ExponentialSmoothing(ts, trend=None, seasonal=None)
            elif chosen == 'holt':
                model = ExponentialSmoothing(ts, trend='add', seasonal=None)
            else:
                model = ExponentialSmoothing(ts, trend='add', seasonal='add', seasonal_periods=seasonal_periods)
            fit = model.fit(optimized=True)
            return fit, fit.params, None
        except Exception as e:
            raise RuntimeError("Optimized fit failed: " + str(e))

    # run grid
    for i, params in enumerate(candidates, 1):
        try:
            if chosen == 'ses':
                model = ExponentialSmoothing(ts, trend=None, seasonal=None)
                fit = model.fit(smoothing_level=params['smoothing_level'], optimized=False)
            elif chosen == 'holt':
                model = ExponentialSmoothing(ts, trend='add', seasonal=None)
                fit = model.fit(smoothing_level=params['smoothing_level'],
                                smoothing_slope=params['smoothing_slope'],
                                optimized=False)
            else:
                model = ExponentialSmoothing(ts, trend='add', seasonal='add', seasonal_periods=seasonal_periods)
                fit = model.fit(smoothing_level=params['smoothing_level'],
                                smoothing_slope=params['smoothing_slope'],
                                smoothing_seasonal=params['smoothing_seasonal'],
                                optimized=False)
            aic = getattr(fit, 'aic', np.inf)
            results.append({'params': params, 'aic': aic, 'llf': getattr(fit, 'llf', None)})
            if aic < best_aic:
                best_aic = aic
                best_fit = fit
                best_params = params
        except Exception as e:
            # skip invalid combos (can happen when model can't converge)
            # print(f"skip params {params}: {e}")
            continue

    if best_fit is None:
        raise RuntimeError("Grid search failed to fit any model; try optimized=True or different grid ranges")

    # build results DataFrame for inspection
    results_df = pd.DataFrame([{'aic': r['aic'], **r['params']} for r in results]).sort_values('aic').reset_index(drop=True)

    print("Best AIC:", best_aic)
    print("Best params:", best_params)
    return best_fit, best_params, results_df

In [None]:
# -------------------------
# Example usage with sample series
# -------------------------
if __name__ == "__main__":
    # Example: synthetic monthly series with trend + seasonality
    rng = pd.date_range('2015-01-01', periods=120, freq='M')
    np.random.seed(42)
    seasonal = 10 * np.sin(2 * np.pi * (np.arange(len(rng)) % 12) / 12)
    trend = 0.5 * np.arange(len(rng))
    noise = np.random.normal(scale=3, size=len(rng))
    data = 50 + trend + seasonal + noise
    ts = pd.Series(data, index=rng)

    # Run grid search (auto model detection)
    fit, best_params, results_df = grid_search_es(ts, model_type='auto', seasonal_periods=None, use_grid=True)

    # Forecast example
    steps = 12
    forecast = fit.forecast(steps)

    # Print short diagnostics
    print("\nFitted params from statsmodels fit object:")
    print(fit.params)
    print("\nTop 5 grid results (first rows):")
    if results_df is not None:
        print(results_df.head())

    # Plot
    plt.figure(figsize=(10,5))
    plt.plot(ts, label='Actual')
    plt.plot(fit.fittedvalues, label='Fitted', linestyle='--')
    plt.plot(forecast, label='Forecast', linestyle='-.')
    plt.title("Exponential Smoothing - Fitted vs Forecast")
    plt.legend()
    plt.show()

    # Quick error measure on last 'steps' if you want a rough holdout (not strict CV)
    try:
        last_actual = ts[-steps:]
        # align forecast length
        fa = forecast[:len(last_actual)]
        print("MAE (last periods):", mean_absolute_error(last_actual, fa))
        print("RMSE (last periods):", mean_squared_error(last_actual, fa, squared=False))
    except Exception:
        pass

In [None]:
"""
Part 4: Model Evaluation and Comparison
---------------------------------------
Compares ARIMA and Exponential Smoothing forecasts using MAE, RMSE, and MAPE.
"""

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error

In [None]:
# --- Example setup: replace with your actual fitted models and series ---
# Suppose 'test' is your test set, 'forecast_hw' and 'forecast_arima' are their forecasts.
# For demonstration, we’ll fake some data:
np.random.seed(42)
test = pd.Series(np.random.uniform(80, 85, 12), name='Actual')  # actual exchange rate
forecast_hw = test + np.random.normal(0, 0.3, 12)               # Holt-Winters forecast
forecast_arima = test + np.random.normal(0, 0.5, 12)            # ARIMA forecast

In [None]:
# --- Define evaluation functions ---
def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

In [None]:
def evaluate_model(y_true, y_pred, model_name):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mape = mean_absolute_percentage_error(y_true, y_pred)
    return pd.Series({'MAE': mae, 'RMSE': rmse, 'MAPE': mape}, name=model_name)

In [None]:
# --- Compute metrics for both models ---
results_hw = evaluate_model(test, forecast_hw, 'Holt-Winters')
results_arima = evaluate_model(test, forecast_arima, 'ARIMA')

In [None]:
results_df = pd.concat([results_hw, results_arima], axis=1).T
print("\n🔹 Forecast Error Metrics Comparison:\n")
print(results_df)

In [None]:
# --- Visual comparison ---
plt.figure(figsize=(10,6))
plt.plot(test.values, label='Actual', color='black', marker='o')
plt.plot(forecast_hw.values, label='Holt-Winters Forecast', linestyle='--', marker='x')
plt.plot(forecast_arima.values, label='ARIMA Forecast', linestyle='-.', marker='s')
plt.title("Exchange Rate Forecast Comparison")
plt.xlabel("Time Steps (Test Periods)")
plt.ylabel("Exchange Rate")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# --- Identify best model ---
best_model = results_df['RMSE'].idxmin()
print(f"\n🏆 Best model based on RMSE: {best_model}\n")

In [None]:
# --- Optional: difference plot for error visualization ---
plt.figure(figsize=(8,4))
plt.bar(['Holt-Winters', 'ARIMA'], results_df['RMSE'], color=['#66c2a5', '#fc8d62'])
plt.title("Model RMSE Comparison")
plt.ylabel("RMSE")
plt.show()