In [None]:
# Data manipulation and analysis
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('seaborn-v0_8')

# Time series analysis
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX

# Auto ARIMA
from pmdarima import auto_arima

# Metrics
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Set random seed for reproducibility
np.random.seed(42)
Subtask 1.2: Create Sample Time Series Data
Let's create a sample dataset that includes both trend and seasonal components:

# Create sample time series data
dates = pd.date_range(start='2020-01-01', end='2023-12-31', freq='M')
n = len(dates)

# Generate synthetic data with trend and seasonality
trend = np.linspace(100, 200, n)
seasonal = 10 * np.sin(2 * np.pi * np.arange(n) / 12)
noise = np.random.normal(0, 5, n)
ts_data = trend + seasonal + noise

# Create DataFrame
df = pd.DataFrame({
    'date': dates,
    'value': ts_data
})

df.set_index('date', inplace=True)

In [None]:
# Create airline passenger-like data
dates_airline = pd.date_range(start='2015-01-01', end='2023-12-31', freq='M')
n_airline = len(dates_airline)

# Generate airline passenger-like data
base_passengers = 100
trend_airline = np.linspace(0, 50, n_airline)
seasonal_airline = 20 * np.sin(2 * np.pi * np.arange(n_airline) / 12) + 10 * np.sin(4 * np.pi * np.arange(n_airline) / 12)
growth = np.exp(0.02 * np.arange(n_airline))
noise_airline = np.random.normal(0, 8, n_airline)

airline_data = (base_passengers + trend_airline + seasonal_airline) * growth + noise_airline

# Create DataFrame
airline_df = pd.DataFrame({
    'date': dates_airline,
    'passengers': airline_data
})

airline_df.set_index('date', inplace=True)

In [None]:
# Create subplots for visualization
fig, axes = plt.subplots(2, 1, figsize=(12, 10))

# Plot sample data
axes[0].plot(df.index, df['value'], linewidth=2, color='blue')
axes[0].set_title('Sample Time Series Data', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Date')
axes[0].set_ylabel('Value')
axes[0].grid(True, alpha=0.3)

# Plot airline data
axes[1].plot(airline_df.index, airline_df['passengers'], linewidth=2, color='red')
axes[1].set_title('Airline Passenger Data', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Date')
axes[1].set_ylabel('Passengers')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Decompose the airline passenger data
decomposition = seasonal_decompose(airline_df['passengers'], model='multiplicative', period=12)

# Plot decomposition
fig, axes = plt.subplots(4, 1, figsize=(12, 12))

decomposition.observed.plot(ax=axes[0], title='Original Time Series', color='blue')
decomposition.trend.plot(ax=axes[1], title='Trend Component', color='green')
decomposition.seasonal.plot(ax=axes[2], title='Seasonal Component', color='orange')
decomposition.resid.plot(ax=axes[3], title='Residual Component', color='red')

for ax in axes:
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
def check_stationarity(timeseries, title):
    """
    Function to check stationarity using ADF test
    """
    print(f"\n=== ADF Test Results for {title} ===")
    
    # Perform ADF test
    result = adfuller(timeseries.dropna())
    
    print(f'ADF Statistic: {result[0]:.6f}')
    print(f'p-value: {result[1]:.6f}')
    print(f'Critical Values:')
    for key, value in result[4].items():
        print(f'\t{key}: {value:.3f}')
    
    # Interpretation
    if result[1] <= 0.05:
        print("✅ Series is STATIONARY (reject null hypothesis)")
    else:
        print("❌ Series is NON-STATIONARY (fail to reject null hypothesis)")
    
    return result[1] <= 0.05

# Test stationarity for both datasets
is_stationary_sample = check_stationarity(df['value'], "Sample Data")
is_stationary_airline = check_stationarity(airline_df['passengers'], "Airline Data")

In [None]:
def make_stationary(data, max_diff=3):
    """
    Function to make time series stationary through differencing
    """
    original_data = data.copy()
    diff_order = 0
    
    for i in range(max_diff):
        # Check if current data is stationary
        adf_result = adfuller(data.dropna())
        
        if adf_result[1] <= 0.05:
            print(f"Data is stationary after {diff_order} differencing(s)")
            break
        else:
            diff_order += 1
            data = data.diff()
            print(f"Applied differencing order {diff_order}, p-value: {adf_result[1]:.6f}")
    
    return data, diff_order

# Apply differencing to airline data if needed
if not is_stationary_airline:
    airline_diff, d_order = make_stationary(airline_df['passengers'])
    
    # Visualize original vs differenced data
    fig, axes = plt.subplots(2, 1, figsize=(12, 8))
    
    axes[0].plot(airline_df.index, airline_df['passengers'], color='blue')
    axes[0].set_title('Original Airline Data', fontweight='bold')
    axes[0].grid(True, alpha=0.3)
    
    axes[1].plot(airline_df.index, airline_diff, color='red')
    axes[1].set_title(f'Differenced Data (d={d_order})', fontweight='bold')
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
else:
    airline_diff = airline_df['passengers']
    d_order = 0

In [None]:
def plot_acf_pacf(data, title, lags=40):
    """
    Plot ACF and PACF to help determine ARIMA parameters
    """
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot ACF
    plot_acf(data.dropna(), ax=axes[0], lags=lags, title=f'ACF - {title}')
    
    # Plot PACF
    plot_pacf(data.dropna(), ax=axes[1], lags=lags, title=f'PACF - {title}')
    
    plt.tight_layout()
    plt.show()

# Plot ACF and PACF for differenced airline data
plot_acf_pacf(airline_diff, "Differenced Airline Data")

In [None]:
def evaluate_arima_model(data, arima_order):
    """
    Evaluate ARIMA model with given parameters
    """
    try:
        model = ARIMA(data, order=arima_order)
        model_fit = model.fit()
        return model_fit.aic
    except:
        return float('inf')

def manual_arima_tuning(data, max_p=3, max_d=2, max_q=3):
    """
    Manual grid search for optimal ARIMA parameters
    """
    best_aic = float('inf')
    best_order = None
    results = []
    
    print("Evaluating ARIMA parameters...")
    print("Order (p,d,q) | AIC Score")
    print("-" * 25)
    
    for p in range(max_p + 1):
        for d in range(max_d + 1):
            for q in range(max_q + 1):
                order = (p, d, q)
                try:
                    aic = evaluate_arima_model(data, order)
                    results.append((order, aic))
                    print(f"({p},{d},{q})        | {aic:.2f}")
                    
                    if aic < best_aic:
                        best_aic = aic
                        best_order = order
                except:
                    continue
    
    print(f"\nBest ARIMA order: {best_order} with AIC: {best_aic:.2f}")
    return best_order, results

# Find optimal parameters for airline data
best_order_manual, all_results = manual_arima_tuning(airline_df['passengers'])

In [None]:
def auto_arima_selection(data, seasonal=False, m=12):
    """
    Use auto_arima for automatic parameter selection
    """
    print("Running auto_arima...")
    
    auto_model = auto_arima(
        data,
        start_p=0, start_q=0,
        max_p=5, max_q=5,
        seasonal=seasonal,
        m=m,
        stepwise=True,
        suppress_warnings=True,
        error_action='ignore',
        trace=True
    )
    
    print(f"\nAuto ARIMA recommended order: {auto_model.order}")
    if seasonal:
        print(f"Seasonal order: {auto_model.seasonal_order}")
    
    return auto_model

# Apply auto_arima to airline data
auto_model = auto_arima_selection(airline_df['passengers'], seasonal=False)

In [None]:
def build_arima_model(data, order):
    """
    Build and fit ARIMA model
    """
    print(f"Building ARIMA{order} model...")
    
    # Fit the model
    model = ARIMA(data, order=order)
    fitted_model = model.fit()
    
    # Print model summary
    print(fitted_model.summary())
    
    return fitted_model

# Build ARIMA model with auto-selected parameters
arima_model = build_arima_model(airline_df['passengers'], auto_model.order)

In [None]:
def auto_sarima_selection(data, m=12):
    """
    Use auto_arima for SARIMA model selection
    """
    print("Running auto_arima for SARIMA...")
    
    sarima_model = auto_arima(
        data,
        start_p=0, start_q=0,
        max_p=3, max_q=3,
        start_P=0, start_Q=0,
        max_P=2, max_Q=2,
        seasonal=True,
        m=m,
        stepwise=True,
        suppress_warnings=True,
        error_action='ignore',
        trace=True
    )
    
    print(f"\nSARIMA recommended order: {sarima_model.order}")
    print(f"Seasonal order: {sarima_model.seasonal_order}")
    
    return sarima_model

# Apply auto_arima for SARIMA
sarima_auto = auto_sarima_selection(airline_df['passengers'])

In [None]:
def build_sarima_model(data, order, seasonal_order):
    """
    Build and fit SARIMA model
    """
    print(f"Building SARIMA{order}x{seasonal_order} model...")
    
    # Fit the model
    model = SARIMAX(data, order=order, seasonal_order=seasonal_order)
    fitted_model = model.fit(disp=False)
    
    # Print model summary
    print(fitted_model.summary())
    
    return fitted_model

# Build SARIMA model
sarima_model = build_sarima_model(
    airline_df['passengers'], 
    sarima_auto.order, 
    sarima_auto.seasonal_order
)

In [None]:
def analyze_residuals(model, title):
    """
    Analyze model residuals
    """
    residuals = model.resid
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Residuals plot
    axes[0, 0].plot(residuals)
    axes[0, 0].set_title(f'{title} - Residuals')
    axes[0, 0].grid(True, alpha=0.3)
    
    # Residuals histogram
    axes[0, 1].hist(residuals, bins=30, density=True, alpha=0.7)
    axes[0, 1].set_title(f'{title} - Residuals Distribution')
    axes[0, 1].grid(True, alpha=0.3)
    
    # Q-Q plot
    from scipy import stats
    stats.probplot(residuals, dist="norm", plot=axes[1, 0])
    axes[1, 0].set_title(f'{title} - Q-Q Plot')
    axes[1, 0].grid(True, alpha=0.3)
    
    # ACF of residuals
    plot_acf(residuals, ax=axes[1, 1], title=f'{title} - ACF of Residuals')
    
    plt.tight_layout()
    plt.show()
    
    # Statistical tests
    print(f"\n=== Residual Analysis for {title} ===")
    print(f"Mean of residuals: {np.mean(residuals):.6f}")
    print(f"Standard deviation: {np.std(residuals):.6f}")
    
    # Ljung-Box test for autocorrelation
    from statsmodels.stats.diagnostic import acorr_ljungbox
    lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
    print(f"Ljung-Box test p-value: {lb_test['lb_pvalue'].iloc[-1]:.6f}")

# Analyze residuals for both models
analyze_residuals(arima_model, "ARIMA")
analyze_residuals(sarima_model, "SARIMA")

In [None]:
def analyze_residuals(model, title):
    """
    Analyze model residuals
    """
    residuals = model.resid
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Residuals plot
    axes[0, 0].plot(residuals)
    axes[0, 0].set_title(f'{title} - Residuals')
    axes[0, 0].grid(True, alpha=0.3)
    
    # Residuals histogram
    axes[0, 1].hist(residuals, bins=30, density=True, alpha=0.7)
    axes[0, 1].set_title(f'{title} - Residuals Distribution')
    axes[0, 1].grid(True, alpha=0.3)
    
    # Q-Q plot
    from scipy import stats
    stats.probplot(residuals, dist="norm", plot=axes[1, 0])
    axes[1, 0].set_title(f'{title} - Q-Q Plot')
    axes[1, 0].grid(True, alpha=0.3)
    
    # ACF of residuals
    plot_acf(residuals, ax=axes[1, 1], title=f'{title} - ACF of Residuals')
    
    plt.tight_layout()
    plt.show()
    
    # Statistical tests
    print(f"\n=== Residual Analysis for {title} ===")
    print(f"Mean of residuals: {np.mean(residuals):.6f}")
    print(f"Standard deviation: {np.std(residuals):.6f}")
    
    # Ljung-Box test for autocorrelation
    from statsmodels.stats.diagnostic import acorr_ljungbox
    lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
    print(f"Ljung-Box test p-value: {lb_test['lb_pvalue'].iloc[-1]:.6f}")

# Analyze residuals for both models
analyze_residuals(arima_model, "ARIMA")
analyze_residuals(sarima_model, "SARIMA")

In [None]:
def generate_forecasts(model, steps=12, alpha=0.05):
    """
    Generate forecasts with confidence intervals
    """
    # Generate forecast
    forecast_result = model.get_forecast(steps=steps)
    forecast = forecast_result.predicted_mean
    conf_int = forecast_result.conf_int(alpha=alpha)
    
    return forecast, conf_int

# Generate forecasts for both models
forecast_steps = 12
arima_forecast, arima_conf_int = generate_forecasts(arima_model, forecast_steps)
sarima_forecast, sarima_conf_int = generate_forecasts(sarima_model, forecast_steps)

print("=== Forecast Results ===")
print(f"ARIMA Forecast (next {forecast_steps} periods):")
print(arima_forecast.round(2))
print(f"\nSARIMA Forecast (next {forecast_steps} periods):")
print(sarima_forecast.round(2))

In [None]:
def plot_forecasts(data, model, forecast, conf_int, title, periods_to_show=36):
    """
    Plot historical data with forecasts and confidence intervals
    """
    fig, ax = plt.subplots(figsize=(15, 8))
    
    # Plot historical data (last periods_to_show points)
    historical_data = data.iloc[-periods_to_show:]
    ax.plot(historical_data.index, historical_data.values, 
            label='Historical Data', color='blue', linewidth=2)
    
    # Plot fitted values
    fitted_values = model.fittedvalues.iloc[-periods_to_show:]
    ax.plot(fitted_values.index, fitted_values.values, 
            label='Fitted Values', color='green', linewidth=1, alpha=0.7)
    
    # Create future dates for forecast
    last_date = data.index[-1]
    future_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), 
                                periods=len(forecast), freq='M')
    
    # Plot forecast
    ax.plot(future_dates, forecast.values, 
            label='Forecast', color='red', linewidth=2, marker='o')
    
    # Plot confidence intervals
    ax.fill_between(future_dates, 
                   conf_int.iloc[:, 0], 
                   conf_int.iloc[:, 1], 
                   color='red', alpha=0.2, label='95% Confidence Interval')
    
    ax.set_title(f'{title} - Forecast with Confidence Intervals', 
                fontsize=14, fontweight='bold')
    ax.set_xlabel('Date')
    ax.set_ylabel('Value')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

# Plot forecasts for both models
plot_forecasts(airline_df['passengers'], arima_model, arima_forecast, 
               arima_conf_int, 'ARIMA Model')

plot_forecasts(airline_df['passengers'], sarima_model, sarima_forecast, 
               sarima_conf_int, 'SARIMA Model')
Subtask 6.3: Create Forecast Summary
Let's create a comprehensive forecast summary:

def create_forecast_summary(data, models, forecasts, conf_intervals, model_names):
    """
    Create a comprehensive forecast summary
    """
    print("=" * 60)
    print("COMPREHENSIVE FORECAST SUMMARY")
    print("=" * 60)
    
    # Last historical value
    last_value = data.iloc[-1]
    last_date = data.index[-1]
    
    print(f"Last Historical Value: {last_value:.2f} (Date: {last_date.strftime('%Y-%m')})")
    print()
    
    for name, forecast, conf_int in zip(model_names, forecasts, conf_intervals):
        print(f"--- {name} MODEL FORECAST ---")
        
        # Create forecast DataFrame
        future_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), 
                                   periods=len(forecast), freq='M')
        
        forecast_df = pd.DataFrame({
            'Date': future_dates,
            'Forecast': forecast.values,
            'Lower_CI': conf_int.iloc[:, 0].values,
            'Upper_CI': conf_int.iloc[:, 1].values
        })
        
        print(forecast_df.round(2))
        
        # Calculate growth rates
        growth_rate = ((forecast.iloc[-1] - last_value) / last_value) * 100
        print(f"\nProjected Growth Rate (12 months): {growth_rate:.2f}%")
        print(f"Average Monthly Growth: {growth_rate/12:.2f}%")
        print()

# Create forecast summary
create_forecast_summary(
    airline_df['passengers'],
    [arima_model, sarima_model],
    [arima_forecast, sarima_forecast],
    [arima_conf_int, sarima_conf_int],
    ['ARIMA', 'SARIMA']
)

In [None]:
def walk_forward_validation(data, model_order, seasonal_order=None, n_test=12):
    """
    Perform walk-forward validation
    """
    print(f"Performing walk-forward validation with {n_test} test periods...")
    
    # Split data
    train_size = len(data) - n_test
    train_data = data.iloc[:train_size]
    test_data = data.iloc[train_size:]
    
    predictions = []
    actual_values = []
    
    for i in range(n_test):
        # Extend training data
        current_train = data.iloc[:train_size + i]
        current_actual = data.iloc[train_size + i]
        
        try:
            # Fit model
            if seasonal_order is not None:
                model = SARIMAX(current_train, order=model_order, seasonal_order=seasonal_order)
            else:
                model = ARIMA(current_train, order=model_order)
            
            fitted_model = model.fit(disp=False)
            
            # Make one-step ahead forecast
            forecast = fitted_model.get_forecast(steps=1)
            pred_value = forecast.predicted_mean.iloc[0]
            
            predictions.append(pred_value)
            actual_values.append(current_actual)
            
        except Exception as e:
            print(f"Error at step {i}: {e}")
            predictions.append(np.nan)
            actual_values.append(current_actual)
    
    # Calculate validation metrics
    predictions = np.array(predictions)
    actual_values = np.array(actual_values)
    
    # Remove NaN values
    valid_mask = ~np.isnan(predictions)
    predictions = predictions[valid_mask]
    actual_values = actual_values[valid_mask]
    
    mae = mean_absolute_error(actual_values, predictions)
    mse = mean_squared_error(actual_values, predictions)
    rmse = np.sqrt(mse)
    
    print(f"Walk-Forward Validation Results:")
    print(f"MAE: {mae:.4f}")
    print(f"MSE: {mse:.4f}")
    print(f"RMSE: {rmse:.4f}")
    
    return predictions, actual_values, {'MAE': mae, 'MSE': mse, 'RMSE': rmse}

# Perform walk-forward validation for SARIMA model
sarima_pred, sarima_actual, sarima_metrics = walk_forward_validation(
    airline_df['passengers'], 
    sarima_auto.order, 
    sarima_auto.seasonal_order
)

In [None]:
def plot_validation_results(predictions, actual_values, title):
    """
    Plot validation results
    """
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    
    # Time series plot
    axes[0].plot(range(len(actual_values)), actual_values, 
                label='Actual', marker='o', color='blue')
    axes[0].plot(range(len(predictions)), predictions, 
                label='Predicted', marker='s', color='red')
    axes[0].set_title(f'{title} - Actual vs Predicted')
    axes[0].set_xlabel('Time Period')
    axes[0].set_ylabel('Value')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # Scatter plot
    axes[1].scatter(actual_values, predictions, alpha=0.7, color='green')
    axes[1].plot([min(actual_values), max(actual_values)], 
                [min(actual_values), max(actual_values)], 
                'r--', label='Perfect Prediction')
    axes[1].set_title(f'{title} - Prediction Accuracy')
    axes[1].set_xlabel('Actual Values')
    axes[1].set_ylabel('Predicted Values')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# Plot validation results
plot_validation_results(sarima_pred, sarima_actual, 'SARIMA Model Validation')

In [None]:
def generate_business_insights(data, forecast, model_name):
    """
    Generate business insights from forecasting results
    """
    print(f"=" * 50)
    print(f"BUSINESS INSIGHTS - {model_name}")
    print(f"=" * 50)
    
    # Current performance
    current_value = data.iloc[-1]
    forecast_12m = forecast.iloc[-1]
    
    # Growth analysis
    total_growth = ((forecast_12m - current_value) / current_value) * 100
    monthly_growth = total_growth / 12
    
    print(f"Current Value: {current_value:.2f}")
    print(f"12-Month Forecast: {forecast_12m:.2f}")
    print(f"Expected Growth: {total_growth:.2f}%")
    print(f"Average Monthly Growth: {monthly_growth:.2f}%")
    
    # Trend analysis
    if total_growth > 5:
        trend = "Strong Upward Trend"
        recommendation = "Consider capacity expansion and resource planning"
    elif total_growth > 0:
        trend = "Moderate Growth"
        recommendation = "Monitor performance and prepare for gradual scaling"
    elif total_growth > -5:
        trend = "Stable/Slight Decline"
        recommendation = "Focus on efficiency improvements and cost optimization"
    else:
        trend = "Declining Trend"
        recommendation = "Investigate causes and implement corrective measures"
    
    print(f"\nTrend Analysis: {trend}")
    print(f"Recommendation: {recommendation}")
    
    # Seasonal insights (if applicable)
    if len(forecast) >= 12:
        seasonal_pattern = forecast.values
        peak_month = np.argmax(seasonal_pattern) + 1
        low_month = np.argmin(seasonal_pattern) + 1
        
        print(f"\nSeasonal Insights:")
        print(f"Peak expected in month {peak_month}")
        print(f"Lowest point expected in month {low_month}")
        print(f"Seasonal variation: {(np.max(seasonal_pattern) - np.min(seasonal_pattern)):.2f}")

# Generate insights for SARIMA model
generate_business_insights(airline_df['passengers'], sarima_forecast, 'SARIMA')