# Day 17: ARIMA Diagnostics
## Residual Analysis for Model Validation

This notebook validates ARIMA(0,1,0) model assumptions through comprehensive residual diagnostics.

## Section 1: Import and Setup

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.subplots as sp
from scipy import stats
from sklearn.metrics import mean_squared_error, mean_absolute_error

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox

import warnings
warnings.filterwarnings('ignore')

print("✓ Imports successful")

## Section 2: Load and Prepare Data

In [None]:
# Load data
df = pd.read_csv('../data/gold_prices.csv', parse_dates=['Date'])
if 'Price' not in df.columns:
    df = df.rename(columns={'Adj Close': 'Price'})
df = df.drop_duplicates(subset=['Date']).sort_values('Date').reset_index(drop=True)

# Train-test split
train_size = int(len(df) * 0.8)
train_data = df[:train_size].copy()
test_data = df[train_size:].copy()

print(f"Data loaded: {len(df)} observations")
print(f"Train: {len(train_data)} | Test: {len(test_data)}")
print(f"Date range: {df['Date'].min().date()} to {df['Date'].max().date()}")

## Section 3: Fit ARIMA(0,1,0) Model

In [None]:
# Fit optimal ARIMA(0,1,0) from Day 16
model = ARIMA(train_data['Price'], order=(0, 1, 0))
fitted_model = model.fit()

print(f"Model: ARIMA(0,1,0)")
print(f"AIC: {fitted_model.aic:.2f}")
print(f"BIC: {fitted_model.bic:.2f}")
print(f"\nModel Summary:")
print(fitted_model.summary())

## Section 4: Extract and Analyze Residuals

In [None]:
# In-sample residuals
residuals = fitted_model.resid.dropna()

print("In-Sample Residual Statistics:")
print(f"  Count: {len(residuals)}")
print(f"  Mean: {residuals.mean():.6f}")
print(f"  Std Dev: {residuals.std():.6f}")
print(f"  Min: {residuals.min():.2f}")
print(f"  Max: {residuals.max():.2f}")
print(f"  Median: {residuals.median():.2f}")
print(f"  Skewness: {stats.skew(residuals):.4f}")
print(f"  Kurtosis: {stats.kurtosis(residuals):.4f}")

# Test set residuals
forecast = fitted_model.get_forecast(steps=len(test_data))
forecast_values = forecast.predicted_mean.values
test_residuals = test_data['Price'].values - forecast_values

print(f"\nTest Set Residual Statistics:")
print(f"  Count: {len(test_residuals)}")
print(f"  Mean: {test_residuals.mean():.2f}")
print(f"  Std Dev: {test_residuals.std():.2f}")

## Section 5: Ljung-Box Test (Autocorrelation)

In [None]:
# Ljung-Box test at multiple lags
lags_to_test = [5, 10, 20, 40]
lb_results = acorr_ljungbox(residuals, lags=lags_to_test, return_df=True)

print("Ljung-Box Test Results:")
print("H0: Residuals are white noise (no autocorrelation)\n")
print(lb_results)

alpha = 0.05
print(f"\nInterpretation (α = {alpha}):")
for lag in lags_to_test:
    p_val = lb_results.loc[lag, 'lb_pvalue']
    is_white = p_val > alpha
    status = "✓ PASS" if is_white else "✗ FAIL"
    print(f"  Lag {lag:2d}: p={p_val:.4f} → {status}")

## Section 6: Jarque-Bera Test (Normality)

In [None]:
# Jarque-Bera test
jb_stat, jb_pvalue = stats.jarque_bera(residuals)
skewness = stats.skew(residuals)
kurtosis = stats.kurtosis(residuals)

print("Jarque-Bera Test for Normality:")
print("H0: Residuals follow normal distribution\n")
print(f"Test Statistic: {jb_stat:.4f}")
print(f"P-value: {jb_pvalue:.4f}")
print(f"Skewness: {skewness:.4f} (ideal: 0)")
print(f"Kurtosis: {kurtosis:.4f} (ideal: 0)")

alpha = 0.05
is_normal = jb_pvalue > alpha
status = "✓ PASS" if is_normal else "✗ FAIL"
print(f"\nInterpretation (α = {alpha}):")
print(f"  Result: {status} - Residuals {'are' if is_normal else 'are NOT'} normally distributed")

if abs(skewness) < 0.5:
    print(f"  Skewness: ✓ Acceptable")
else:
    print(f"  Skewness: ✗ High asymmetry")
    
if abs(kurtosis) < 1.0:
    print(f"  Kurtosis: ✓ Acceptable")
else:
    print(f"  Kurtosis: ✗ Heavy/light tails")

## Section 7: Heteroskedasticity Test (Variance Stability)

In [None]:
# Rolling variance analysis
window = 50
rolling_var = pd.Series(residuals).rolling(window=window).var()

mean_var_first = rolling_var[:len(rolling_var)//2].mean()
mean_var_second = rolling_var[len(rolling_var)//2:].mean()
h_statistic = mean_var_second / mean_var_first if mean_var_first > 0 else 1.0

print("Heteroskedasticity Test (Variance Stability):")
print("H0: Variance is constant over time\n")
print(f"Rolling Variance Analysis (window={window}):")
print(f"  First half mean variance: {mean_var_first:.2f}")
print(f"  Second half mean variance: {mean_var_second:.2f}")
print(f"  H-statistic (ratio): {h_statistic:.4f}")

is_homoscedastic = 0.8 < h_statistic < 1.25
status = "✓ PASS" if is_homoscedastic else "✗ FAIL"
print(f"\nInterpretation:")
if is_homoscedastic:
    print(f"  {status} - Variances are relatively constant")
else:
    print(f"  {status} - Evidence of heteroscedasticity (volatility clustering)")

## Section 8: ACF and PACF of Residuals

In [None]:
# Calculate ACF and PACF
acf_values = acf(residuals, nlags=40, fft=True)
pacf_values = pacf(residuals, nlags=40, method='ywm')

# Count significant lags
ci = 1.96 / np.sqrt(len(residuals))
significant_acf = np.sum(np.abs(acf_values[1:]) > ci)
significant_pacf = np.sum(np.abs(pacf_values[1:]) > ci)

print("ACF and PACF Analysis:")
print(f"Confidence interval: ±{ci:.4f}\n")
print(f"Autocorrelation Function (ACF):")
print(f"  Significant lags (out of 40): {significant_acf}")
print(f"  Expected by chance (α=0.05): ~2")
status_acf = "✓ PASS" if significant_acf <= 3 else "✗ FAIL"
print(f"  Status: {status_acf}")

print(f"\nPartial Autocorrelation Function (PACF):")
print(f"  Significant lags (out of 40): {significant_pacf}")
print(f"  Expected by chance (α=0.05): ~2")
status_pacf = "✓ PASS" if significant_pacf <= 3 else "✗ FAIL"
print(f"  Status: {status_pacf}")

## Section 9: Visualization - Diagnostic Plots

In [None]:
# Create comprehensive diagnostic visualization
fig = sp.make_subplots(
    rows=3, cols=2,
    subplot_titles=(
        'Residuals Over Time',
        'Residual Distribution',
        'Q-Q Plot',
        'ACF of Residuals',
        'PACF of Residuals',
        'Rolling Mean and Variance'
    ),
    specs=[
        [{'type': 'scatter'}, {'type': 'histogram'}],
        [{'type': 'scatter'}, {'type': 'bar'}],
        [{'type': 'bar'}, {'type': 'scatter'}]
    ]
)

# 1. Residuals over time
fig.add_trace(
    go.Scatter(
        x=train_data['Date'],
        y=residuals,
        mode='markers',
        marker=dict(color='#FF6B6B', size=4),
        name='Residuals'
    ),
    row=1, col=1
)
fig.add_hline(y=0, line_dash='dash', line_color='black', row=1, col=1)

# 2. Distribution
fig.add_trace(
    go.Histogram(
        x=residuals,
        nbinsx=40,
        marker=dict(color='#FFD700'),
        name='Distribution'
    ),
    row=1, col=2
)

# 3. Q-Q Plot
sorted_residuals = np.sort(residuals)
theoretical_quantiles = stats.norm.ppf(np.linspace(0.01, 0.99, len(sorted_residuals)))

fig.add_trace(
    go.Scatter(
        x=theoretical_quantiles,
        y=sorted_residuals,
        mode='markers',
        marker=dict(color='#4ECDC4', size=5),
        name='Q-Q'
    ),
    row=2, col=1
)
min_val = min(theoretical_quantiles.min(), sorted_residuals.min())
max_val = max(theoretical_quantiles.max(), sorted_residuals.max())
fig.add_trace(
    go.Scatter(
        x=[min_val, max_val],
        y=[min_val, max_val],
        mode='lines',
        line=dict(color='black', dash='dash'),
        name='Normal'
    ),
    row=2, col=1
)

# 4. ACF
fig.add_trace(
    go.Bar(
        x=np.arange(len(acf_values)),
        y=acf_values,
        marker=dict(color='#95E1D3'),
        name='ACF'
    ),
    row=2, col=2
)
ci = 1.96 / np.sqrt(len(residuals))
fig.add_hline(y=ci, line_dash='dash', line_color='red', row=2, col=2)
fig.add_hline(y=-ci, line_dash='dash', line_color='red', row=2, col=2)

# 5. PACF
fig.add_trace(
    go.Bar(
        x=np.arange(len(pacf_values)),
        y=pacf_values,
        marker=dict(color='#A8D8EA'),
        name='PACF'
    ),
    row=3, col=1
)
fig.add_hline(y=ci, line_dash='dash', line_color='red', row=3, col=1)
fig.add_hline(y=-ci, line_dash='dash', line_color='red', row=3, col=1)

# 6. Rolling statistics
fig.add_trace(
    go.Scatter(
        x=train_data['Date'],
        y=pd.Series(residuals).rolling(window=50).mean(),
        mode='lines',
        name='Rolling Mean',
        line=dict(color='blue')
    ),
    row=3, col=2
)
fig.add_trace(
    go.Scatter(
        x=train_data['Date'],
        y=pd.Series(residuals).rolling(window=50).std(),
        mode='lines',
        name='Rolling Std',
        line=dict(color='red')
    ),
    row=3, col=2
)

fig.update_yaxes(title_text="Residual", row=1, col=1)
fig.update_yaxes(title_text="Frequency", row=1, col=2)
fig.update_yaxes(title_text="Theoretical", row=2, col=1)
fig.update_yaxes(title_text="ACF", row=2, col=2)
fig.update_yaxes(title_text="PACF", row=3, col=1)
fig.update_yaxes(title_text="Value", row=3, col=2)

fig.update_layout(height=1200, width=1400, showlegend=True, hovermode='x unified')
fig.show()

print("✓ Diagnostic visualization created")

## Section 10: Summary and Assessment

In [None]:
# Create summary table
diagnostic_results = {
    'Test': [
        'Ljung-Box (lag 20)',
        'Jarque-Bera',
        'Heteroskedasticity',
        'ACF Significant Lags',
        'PACF Significant Lags'
    ],
    'Statistic': [
        f"{lb_results.loc[20, 'lb_stat']:.4f}",
        f"{jb_stat:.4f}",
        f"{h_statistic:.4f}",
        f"{significant_acf}",
        f"{significant_pacf}"
    ],
    'P-Value': [
        f"{lb_results.loc[20, 'lb_pvalue']:.4f}",
        f"{jb_pvalue:.4f}",
        "N/A",
        "N/A",
        "N/A"
    ],
    'Status': [
        '✓ PASS' if lb_results.loc[20, 'lb_pvalue'] > 0.05 else '✗ FAIL',
        '✓ PASS' if jb_pvalue > 0.05 else '✗ FAIL',
        '✓ PASS' if 0.8 < h_statistic < 1.25 else '✗ FAIL',
        '✓ PASS' if significant_acf <= 3 else '✗ FAIL',
        '✓ PASS' if significant_pacf <= 3 else '✗ FAIL'
    ]
}

summary_df = pd.DataFrame(diagnostic_results)
print("\n" + "="*70)
print("DIAGNOSTIC TEST RESULTS")
print("="*70)
print(summary_df.to_string(index=False))

# Overall assessment
all_passed = all([
    lb_results.loc[20, 'lb_pvalue'] > 0.05,
    jb_pvalue > 0.05,
    0.8 < h_statistic < 1.25,
    significant_acf <= 3,
    significant_pacf <= 3
])

print("\n" + "="*70)
print("OVERALL ASSESSMENT")
print("="*70)

if all_passed:
    print("✓ EXCELLENT - All diagnostic tests passed!")
    print("Model assumptions satisfied. Residuals are white noise.")
    print("Safe to use for forecasting and inference.")
else:
    print("⚠ PARTIAL - Some diagnostic tests failed.")
    print("Review failing tests and consider model refinement.")

# Forecast performance
rmse = np.sqrt(mean_squared_error(test_data['Price'], forecast_values))
mae = mean_absolute_error(test_data['Price'], forecast_values)
mape = np.mean(np.abs((test_data['Price'] - forecast_values) / test_data['Price'])) * 100

print(f"\nForecast Performance:")
print(f"  RMSE: {rmse:.2f}")
print(f"  MAE: {mae:.2f}")
print(f"  MAPE: {mape:.2f}%")

# Naive baseline
naive_forecast = np.full(len(test_data), train_data['Price'].iloc[-1])
naive_rmse = np.sqrt(mean_squared_error(test_data['Price'], naive_forecast))
improvement = (naive_rmse - rmse) / naive_rmse * 100

print(f"\nComparison to Naive Baseline:")
print(f"  Naive RMSE: {naive_rmse:.2f}")
print(f"  ARIMA RMSE: {rmse:.2f}")
print(f"  Improvement: {improvement:+.2f}%")