In [2]:
import pandas as pd

In [3]:
from sklearn.metrics import mean_absolute_error

def time_series_mae(y_true, y_pred):
    """
    Calculate Mean Absolute Error (MAE) for time series cross validation.

    Parameters:
    y_true (array-like): Original/actual values.
    y_pred (array-like): Model predicted values.

    Returns:
    float: MAE value.
    """
    return mean_absolute_error(y_true, y_pred)

In [4]:
data = pd.read_csv(r'C:\Users\hp\Documents\GitHub\Forecast_Treasury_Curve\Dataset\final_feature_library_all_features.csv')

In [5]:
data['Spread'] = data['USGG10YR_mean'] - data['USGG2YR_mean']

In [6]:
# Split the time series into training and test sets (e.g., last 12 months as test)
test_size = 36
train = data['Spread'][:-test_size]
test = data['Spread'][-test_size:]

print(f"Training set length: {len(train)}")
print(f"Test set length: {len(test)}")

Training set length: 267
Test set length: 36


# Naive method

In [7]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error

data = data[['Date', 'USGG10YR_mean', 'USGG2YR_mean', 'Spread']].copy()
# Assuming 'data' is your DataFrame with 'Spread' column
# Define test size
test_size = 36

# Split data into train and test BEFORE creating lagged features
train_data = data[:-test_size].copy()
test_data = data[-test_size:].copy()

print(f"Original training set length: {len(train_data)}")
print(f"Original test set length: {len(test_data)}")

Original training set length: 267
Original test set length: 36


In [8]:
# Create lagged features for training data
train_data['Spread_lag1'] = train_data['Spread'].shift(1)
# Remove the first row with NaN after shift
train_data = train_data.dropna()

print(f"Training set length after creating lag and dropping NaN: {len(train_data)}")

Training set length after creating lag and dropping NaN: 266


In [9]:
# For test set, we need to be careful about the lag feature
# The first prediction in test set should use the last value from training set
test_data['Spread_lag1'] = test_data['Spread'].shift(1)

# Fill the first NaN with the last value from training data
test_data.iloc[0, test_data.columns.get_loc('Spread_lag1')] = train_data['Spread'].iloc[-1]

print(f"Test set length: {len(test_data)}")

Test set length: 36


In [10]:
# Calculate MAE on test set
test_actual = test_data['Spread']
test_pred = test_data['Spread_lag1']
mae_test = mean_absolute_error(test_actual, test_pred)
print(f"Test Set Mean Absolute Error (MAE): {mae_test}")

# Time Series Cross-Validation on Test Set
def time_series_cv_on_test(test_data, min_train_size=5, step=1):
    """
    Perform time series cross-validation on the test set
    """
    cv_scores = []
    n_test = len(test_data)
    
    for i in range(min_train_size, n_test, step):
        # Use first i observations as "training" within test set
        cv_train = test_data.iloc[:i]
        # Use next observation as "validation"
        if i < n_test:
            cv_val_actual = test_data.iloc[i]['Spread']
            cv_val_pred = test_data.iloc[i-1]['Spread']  # Naive forecast
            
            mae_fold = abs(cv_val_actual - cv_val_pred)
            cv_scores.append(mae_fold)
    
    return cv_scores

# Perform cross-validation on test set
cv_scores = time_series_cv_on_test(test_data, min_train_size=5, step=1)
mean_cv_mae = np.mean(cv_scores)

print(f"\nTime Series Cross-Validation Results:")
print(f"Number of CV folds: {len(cv_scores)}")
print(f"Mean CV MAE: {mean_cv_mae}")
print(f"CV MAE Standard Deviation: {np.std(cv_scores)}")

# Summary
print(f"\n{'='*50}")
print("SUMMARY OF RESULTS:")
print(f"{'='*50}")
print(f"Single Test Set MAE: {mae_test:.4f}")
print(f"Time Series CV MAE: {mean_cv_mae:.4f}")

Test Set Mean Absolute Error (MAE): 0.11473869141727826

Time Series Cross-Validation Results:
Number of CV folds: 31
Mean CV MAE: 0.11061027686198786
CV MAE Standard Deviation: 0.08983990502011888

SUMMARY OF RESULTS:
Single Test Set MAE: 0.1147
Time Series CV MAE: 0.1106


# Drift-method

In [11]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error

# Assuming 'data' is your DataFrame with 'Spread' column
# Define test size
test_size = 36

# Split data into train and test
train_data = data[:-test_size].copy()
test_data = data[-test_size:].copy()

print(f"Training set length: {len(train_data)}")
print(f"Test set length: {len(test_data)}")

Training set length: 267
Test set length: 36


In [12]:
def drift_forecast(train_series, h=1):
    """
    Calculate drift forecast for h steps ahead
    Drift method: y_t+h = y_t + h * (y_T - y_1)/(T-1)
    where T is the length of training data
    """
    y_first = train_series.iloc[0]
    y_last = train_series.iloc[-1]
    T = len(train_series)
    
    # Calculate drift (average change per period)
    drift = (y_last - y_first) / (T - 1)
    
    # Forecast h steps ahead
    forecast = y_last + h * drift
    
    return forecast, drift

# Calculate drift forecasts for test set
train_spread = train_data['Spread']
test_predictions = []
drift_values = []

# Create results DataFrame
test_results = test_data.copy()

# Method 2: Rolling Drift Forecasts (updating drift as we get new data)
def rolling_drift_forecast(train_data, test_data):
    """
    Perform rolling drift forecasts where we update our drift estimate
    as we move through the test set
    """
    rolling_predictions = []
    current_train = train_data['Spread'].copy()
    
    for i in range(len(test_data)):
        # Calculate drift forecast 1-step ahead
        forecast, drift = drift_forecast(current_train, h=1)
        rolling_predictions.append(forecast)
        
        # Add actual value to training data for next iteration
        if i < len(test_data) - 1:  # Don't add the last value
            current_train = pd.concat([current_train, test_data['Spread'].iloc[i:i+1]])
    
    return rolling_predictions

# Rolling drift forecasts
rolling_drift_preds = rolling_drift_forecast(train_data, test_data)
test_results['Rolling_Drift_Forecast'] = rolling_drift_preds


In [13]:
# Calculate MAE for rolling drift method
mae_rolling_drift = mean_absolute_error(test_results['Spread'], test_results['Rolling_Drift_Forecast'])
print(f"Rolling Drift Method - Test Set MAE: {mae_rolling_drift:.4f}")

Rolling Drift Method - Test Set MAE: 0.1146


In [14]:
# Time Series Cross-Validation for Drift Method
def drift_cv_on_test(test_data, train_data, min_train_size=5):
    """
    Perform time series cross-validation on test set for drift method
    """
    cv_scores = []
    n_test = len(test_data)
    
    # Combine train and test for progressive training
    full_data = pd.concat([train_data, test_data])
    train_end_idx = len(train_data)
    
    for i in range(min_train_size, n_test):
        # Use training data + first i test observations
        cv_train_series = full_data['Spread'].iloc[:train_end_idx + i]
        
        # Predict next test observation
        cv_val_actual = test_data['Spread'].iloc[i]
        cv_val_pred, _ = drift_forecast(cv_train_series, h=1)
        
        mae_fold = abs(cv_val_actual - cv_val_pred)
        cv_scores.append(mae_fold)
    
    return cv_scores

# CV on test set for drift method
drift_test_cv_scores = drift_cv_on_test(test_data, train_data, min_train_size=5)
mean_drift_test_cv_mae = np.mean(drift_test_cv_scores)

print(f"\nDrift Method - CV on Test Set Results:")
print(f"Number of CV folds: {len(drift_test_cv_scores)}")
print(f"Mean CV MAE: {mean_drift_test_cv_mae:.4f}")
print(f"CV MAE Standard Deviation: {np.std(drift_test_cv_scores):.4f}")


Drift Method - CV on Test Set Results:
Number of CV folds: 31
Mean CV MAE: 0.1104
CV MAE Standard Deviation: 0.0907


# Moving Averages

In [15]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error

# Assuming 'data' is your DataFrame with 'Spread' column
# Define test size
test_size = 36

# Split data into train and test
train_data = data[:-test_size].copy()
test_data = data[-test_size:].copy()

print(f"Training set length: {len(train_data)}")
print(f"Test set length: {len(test_data)}")

def moving_average_forecast(series, window_length):
    """
    Calculate moving average forecast
    Returns the average of the last 'window_length' values
    """
    if len(series) < window_length:
        # If not enough data, use all available data
        return series.mean()
    
    return series.iloc[-window_length:].mean()

def moving_average_cv_on_test(test_data, train_data, window_length, min_train_size=5):
    """
    Perform time series cross-validation on test set for moving average method
    This progressively uses more data from the test set for training
    """
    cv_scores = []
    n_test = len(test_data)
    
    # Combine train and test data for progressive training
    full_data = pd.concat([train_data, test_data])
    train_end_idx = len(train_data)
    
    for i in range(min_train_size, n_test):
        # Use training data + first i test observations for CV training
        cv_train_series = full_data['Spread'].iloc[:train_end_idx + i]
        
        # Predict next test observation using moving average
        cv_val_actual = test_data['Spread'].iloc[i]
        cv_val_pred = moving_average_forecast(cv_train_series, window_length)
        
        mae_fold = abs(cv_val_actual - cv_val_pred)
        cv_scores.append(mae_fold)
    
    return cv_scores

# Test different window lengths and find optimal
window_lengths = [2, 3, 4, 5, 6, 8, 10, 12, 15, 20, 24, 30]
cv_results = {}

print(f"\n{'='*70}")
print("MOVING AVERAGE - TIME SERIES CROSS-VALIDATION RESULTS:")
print(f"{'='*70}")
print(f"{'Window Length':<15} {'Mean CV MAE':<12} {'Std CV MAE':<12} {'# CV Folds':<10}")
print(f"{'-'*70}")

for window_length in window_lengths:
    # Perform cross-validation
    cv_scores = moving_average_cv_on_test(test_data, train_data, window_length, min_train_size=5)
    
    if len(cv_scores) > 0:
        mean_cv_mae = np.mean(cv_scores)
        std_cv_mae = np.std(cv_scores)
        n_folds = len(cv_scores)
        
        cv_results[window_length] = {
            'mean_mae': mean_cv_mae,
            'std_mae': std_cv_mae,
            'n_folds': n_folds,
            'cv_scores': cv_scores
        }
        
        print(f"{window_length:<15} {mean_cv_mae:<12.4f} {std_cv_mae:<12.4f} {n_folds:<10}")
    else:
        print(f"{window_length:<15} {'No CV folds':<12} {'N/A':<12} {'0':<10}")

# Find optimal window length
if cv_results:
    optimal_window = min(cv_results.keys(), key=lambda x: cv_results[x]['mean_mae'])
    optimal_mae = cv_results[optimal_window]['mean_mae']
    
    print(f"\n{'='*70}")
    print("OPTIMAL MOVING AVERAGE CONFIGURATION:")
    print(f"{'='*70}")
    print(f"Optimal window length: {optimal_window}")
    print(f"Optimal CV MAE: {optimal_mae:.4f}")
    print(f"Optimal CV MAE Std: {cv_results[optimal_window]['std_mae']:.4f}")
    print(f"Number of CV folds: {cv_results[optimal_window]['n_folds']}")
    
    # Test final forecast performance with optimal window
    def moving_average_test_forecast(train_data, test_data, window_length):
        """
        Generate moving average forecasts for entire test set
        """
        predictions = []
        
        # Start with training data
        current_series = train_data['Spread'].copy()
        
        for i in range(len(test_data)):
            # Forecast next value using moving average
            pred = moving_average_forecast(current_series, window_length)
            predictions.append(pred)
            
            # Add actual value to series for next prediction (rolling forecast)
            current_series = pd.concat([current_series, test_data['Spread'].iloc[i:i+1]])
        
        return predictions
    
    # Generate forecasts with optimal window
    optimal_predictions = moving_average_test_forecast(train_data, test_data, optimal_window)
    test_mae_optimal = mean_absolute_error(test_data['Spread'], optimal_predictions)
    
    print(f"\nTest Set Performance with Optimal Window:")
    print(f"Test MAE: {test_mae_optimal:.4f}")
    
    # Show detailed CV results for optimal window
    print(f"\nDetailed CV Results for Optimal Window (Length = {optimal_window}):")
    optimal_cv_scores = cv_results[optimal_window]['cv_scores']
    print(f"Individual CV MAE scores: {[f'{score:.4f}' for score in optimal_cv_scores[:10]]}")
    if len(optimal_cv_scores) > 10:
        print(f"... and {len(optimal_cv_scores) - 10} more")
    
    # Performance comparison table
    print(f"\n{'='*70}")
    print("PERFORMANCE RANKING (by CV MAE):")
    print(f"{'='*70}")
    print(f"{'Rank':<5} {'Window':<8} {'CV MAE':<10} {'Std MAE':<10} {'# Folds':<8}")
    print(f"{'-'*70}")
    
    # Sort by CV MAE
    sorted_results = sorted(cv_results.items(), key=lambda x: x[1]['mean_mae'])
    
    for rank, (window, results) in enumerate(sorted_results, 1):
        print(f"{rank:<5} {window:<8} {results['mean_mae']:<10.4f} "
              f"{results['std_mae']:<10.4f} {results['n_folds']:<8}")
    
    # Additional analysis
    best_3_windows = [item[0] for item in sorted_results[:3]]
    worst_3_windows = [item[0] for item in sorted_results[-3:]]
    
    print(f"\n{'='*70}")
    print("ANALYSIS SUMMARY:")
    print(f"{'='*70}")
    print(f"Best 3 window lengths: {best_3_windows}")
    print(f"Worst 3 window lengths: {worst_3_windows}")
    
    best_mae = sorted_results[0][1]['mean_mae']
    worst_mae = sorted_results[-1][1]['mean_mae']
    improvement = ((worst_mae - best_mae) / worst_mae) * 100
    
    print(f"Performance improvement (best vs worst): {improvement:.2f}%")
    print(f"Best CV MAE: {best_mae:.4f}")
    print(f"Worst CV MAE: {worst_mae:.4f}")

else:
    print("No valid CV results obtained. Check data size and parameters.")

# Function to get CV results for a specific window length
def get_ma_cv_performance(window_length):
    """
    Get CV performance for a specific moving average window length
    """
    if window_length in cv_results:
        result = cv_results[window_length]
        print(f"\nMoving Average (Window={window_length}) CV Performance:")
        print(f"Mean CV MAE: {result['mean_mae']:.4f}")
        print(f"Std CV MAE: {result['std_mae']:.4f}")
        print(f"Number of CV folds: {result['n_folds']}")
        return result['mean_mae']
    else:
        print(f"Window length {window_length} not tested. Available: {list(cv_results.keys())}")
        return None

# Example usage:
# ma_performance = get_ma_cv_performance(5)  # Get performance for window=5

Training set length: 267
Test set length: 36

MOVING AVERAGE - TIME SERIES CROSS-VALIDATION RESULTS:
Window Length   Mean CV MAE  Std CV MAE   # CV Folds
----------------------------------------------------------------------
2               0.1307       0.1021       31        
3               0.1476       0.1167       31        
4               0.1733       0.1248       31        
5               0.1974       0.1308       31        
6               0.2167       0.1446       31        
8               0.2671       0.1687       31        
10              0.3228       0.1910       31        
12              0.3802       0.2197       31        
15              0.4651       0.2677       31        
20              0.5724       0.3770       31        
24              0.6290       0.4340       31        
30              0.7052       0.4174       31        

OPTIMAL MOVING AVERAGE CONFIGURATION:
Optimal window length: 2
Optimal CV MAE: 0.1307
Optimal CV MAE Std: 0.1021
Number of CV folds: 31

T