In [None]:
# Quantum Reservoir Computing for Time Series Forecasting - Python Version
# Converted from Julia implementation

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import linalg
from sklearn.linear_model import Ridge
import warnings
warnings.filterwarnings('ignore')


In [None]:
# Model Structure Definition
class MyModel:
    """
    Custom model structure for reservoir computing.
    
    Attributes:
        L: Number of reservoir features/neurons
        OutLen: Length of output prediction
        W: Weight matrix for readout layer (L × OutLen)
        Features: Names of input features used
    """
    def __init__(self, L, OutLen, Features):
        self.L = L
        self.OutLen = OutLen
        self.W = np.zeros((L, OutLen))
        self.Features = Features


In [None]:
# Data Normalization Constants
# These constants are derived from the realized volatility (RV) data range
Max_RV = -1.2543188032019446
Min_RV = -4.7722718186046515

# Coefficient for MSE calculation: (max - min)^2
coe = (Max_RV - Min_RV)**2

# Difference for scaling: (max - min)
dif = Max_RV - Min_RV


In [None]:
# Data Preprocessing Functions
def normalization(x, a, b):
    """Normalize data to range [a, b]."""
    xmax = np.max(x)
    xmin = np.min(x)
    return (b - a) * (x - xmin) / (xmax - xmin) + a

def denormalization(x1, x2, y, a, b):
    """Denormalize data from range [a,b] back to original range [x2,x1]."""
    xmax = x1
    xmin = x2
    return (y - a) * (xmax - xmin) / (b - a) + xmin

def denormalization_alt(x, y, a, b):
    """Denormalize data from range [a, b] back to original scale of x."""
    xmax = np.max(x)
    xmin = np.min(x)
    return (y - a) * (xmax - xmin) / (b - a) + xmin


In [None]:
# Evaluation Metrics for Forecasting
def MAPE(x, y):
    """Mean Absolute Percentage Error"""
    return np.mean(np.abs((x - y) * dif / ((y + 1) * dif + Min_RV))) * 100

def MAPE_std(x, y):
    """Standard deviation of Absolute Percentage Error"""
    return np.std(np.abs((x - y) * dif / ((y + 1) * dif + Min_RV))) * 100

def MAE(x, y):
    """Mean Absolute Error"""
    return np.mean(np.abs((x - y) * dif))

def MAE_std(x, y):
    """Standard deviation of Absolute Error"""
    return np.std(np.abs((x - y) * dif))

def MSE(x, y):
    """Mean Squared Error"""
    return np.mean(((x - y)**2) * coe)

def MSE_std(x, y):
    """Standard deviation of Squared Error"""
    return np.std(((x - y)**2) * coe)

def RMSE(x, y):
    """Root Mean Squared Error"""
    return np.sqrt(np.mean(((x - y)**2) * coe))


In [None]:
# Hit Rate Calculation
def wave(y):
    """
    Compute the directional change (wave) of a time series.
    Returns +1 if increasing, -1 if decreasing.
    """
    L = len(y)
    w = np.zeros(L - 1)
    for i in range(L - 1):
        w[i] = np.sign(y[i + 1] - y[i])
    return w

def hitrate(x, y):
    """
    Calculate the hit rate (directional accuracy) between predictions and actuals.
    """
    L = len(x)
    # Prepend reference value
    x = np.concatenate([[-0.5704088242386152], x])
    y = np.concatenate([[-0.5704088242386152], y])
    # Get directional changes
    wx = wave(x)
    wy = wave(y)
    # Return fraction of correct directions
    return np.sum(wx == wy) / L


In [None]:
# Loss Functions for Volatility Forecasting
def compute_qlike(forecasts, actuals):
    """
    Compute the QLIKE (Quasi-Likelihood) loss function for volatility forecasting.
    """
    # Denormalize: convert from [-1,1] range back to original scale
    forecasts = np.abs((forecasts + 1) * dif + Min_RV)
    actuals = np.abs((actuals + 1) * dif + Min_RV)
    
    # Calculate the ratio
    ratio = actuals / forecasts
    
    # Compute QLIKE
    qlike = np.sum(ratio - np.log(ratio) - 1)
    return qlike

def compute_qlike2(forecasts, actuals):
    """
    Alternative QLIKE computation using exponential transformation.
    """
    # Denormalize data
    forecasts = (forecasts + 1) * dif + Min_RV
    actuals = (actuals + 1) * dif + Min_RV
    
    # Calculate ratio with exponential transformation
    ratio = np.exp(actuals) / np.exp(forecasts)
    
    # Compute modified QLIKE
    qlike = np.sum(ratio - (actuals - forecasts) - 1)
    return qlike


In [None]:
# Utility Functions
def coeff_matrix(N, J):
    """
    Generate a random symmetric coupling matrix for the reservoir.
    """
    m = np.random.rand(N, N)
    # Symmetrize the matrix
    m = (m + m.T) / 2
    # Zero out diagonal
    np.fill_diagonal(m, 0.0)
    # Normalize by largest eigenvalue and scale by J
    eigvals = np.linalg.eigvals(m)
    return m / np.max(eigvals) * J

def shift(V, step):
    """
    Shift a vector or matrix forward by 'step' positions, padding with zeros.
    """
    if V.ndim == 1:
        V1 = np.zeros(len(V))
        V1[step:] = V[:-step]
    else:
        V1 = np.zeros_like(V)
        V1[step:, :] = V[:-step, :]
    return V1

def rolling(V, window):
    """
    Create a rolling window matrix from a vector.
    """
    M = np.zeros((len(V), window))
    for i in range(window):
        M[i:, i] = V[:len(V) - i]
    return M


In [None]:
# Classical Reservoir Computing (Approximation of Quantum Reservoir)
def create_reservoir(nqubit, coupling_matrix, input_size):
    """
    Create a classical reservoir as an approximation of the quantum reservoir.
    Uses Echo State Network principles.
    """
    reservoir_size = 2**nqubit  # Approximate Hilbert space dimension
    
    # Initialize reservoir weights based on coupling matrix
    W_reservoir = np.random.randn(reservoir_size, reservoir_size) * 0.1
    
    # Normalize spectral radius for stability
    spectral_radius = 0.95
    eigvals = np.linalg.eigvals(W_reservoir)
    W_reservoir = W_reservoir * (spectral_radius / np.max(np.abs(eigvals)))
    
    # Input weights
    W_in = np.random.randn(reservoir_size, input_size) * 0.5
    
    return W_reservoir, W_in

def reservoir_state_update(state, W_reservoir, W_in, input_data, tau=1.0):
    """
    Update reservoir state with new input.
    """
    # Nonlinear activation (tanh)
    new_state = np.tanh(W_reservoir @ state + W_in @ input_data)
    return new_state

def Quantum_Reservoir(Data, features, coupling_matrix, nqubit, K_delay, VirtualNode, tau):
    """
    Classical approximation of quantum reservoir computing.
    
    This function simulates the quantum reservoir using classical reservoir computing.
    """
    L = len(Data)
    InputSize = len(features)
    reservoir_size = 2**nqubit
    
    # Create reservoir
    W_reservoir, W_in = create_reservoir(nqubit, coupling_matrix, InputSize)
    
    # Output features: nqubit * VirtualNode
    Output = np.zeros((nqubit * VirtualNode, L))
    
    # Process each time step
    for l in range(K_delay, L):
        # Initialize reservoir state
        state = np.zeros(reservoir_size)
        
        # Process K_delay previous time steps
        for k in range(K_delay, 0, -1):
            # Extract input features
            input_data = np.array([Data[features[i]].iloc[l - k] for i in range(InputSize)])
            
            # Normalize input to [-1, 1] range (like the π scaling in Julia)
            input_data = np.clip(input_data, -1, 1)
            
            # Update reservoir state
            state = reservoir_state_update(state, W_reservoir, W_in, input_data, tau)
        
        # Virtual nodes: sample reservoir state multiple times with small evolution
        for v in range(VirtualNode):
            # Small evolution step
            state = reservoir_state_update(state, W_reservoir, np.zeros((reservoir_size, InputSize)), np.zeros(InputSize), tau / VirtualNode)
            
            # Extract features (measurements)
            for n in range(nqubit):
                # Sample from different parts of reservoir state
                idx = v * nqubit + n
                # Use subset of reservoir state as "observable"
                start_idx = n * (reservoir_size // nqubit)
                end_idx = (n + 1) * (reservoir_size // nqubit)
                Output[idx, l] = np.mean(state[start_idx:end_idx])
    
    return Output


In [None]:
# Load Data
# Note: Uncomment and modify paths as needed
# df = pd.read_csv("output.csv")
# plt.figure(figsize=(16, 8))
# plt.plot(range(1, 817), df['log_return'], linewidth=2, color='darkblue')
# plt.xlabel('Time', fontsize=24)
# plt.ylabel('log RV', fontsize=24)
# plt.xticks(range(1, 817, 120), ["1950", "1960", "1970", "1980", "1990", "2000", "2010"])
# plt.grid(False)
# plt.savefig('Fig4.png', dpi=150)

# Load main data
Datas = pd.read_csv("Data.CSV")
print(f"Data shape: {Datas.shape}")
print(f"Columns: {list(Datas.columns)}")


In [None]:
# Setup Quantum Reservoir Parameters
nqubit = 10
tau = 1
K = 3  # K_delay

# Generate or load coupling matrices
# In the Julia version, this is loaded from coeff_10.jld2
# For Python, we'll generate them
np.random.seed(42)  # For reproducibility
ms = np.array([coeff_matrix(nqubit, 1) for _ in range(100)])
print(f"Generated {len(ms)} coupling matrices of shape {ms[0].shape}")

# LB is related to observables - in quantum version it's length of B (nqubit observables)
LB = nqubit
OutLen = LB


In [None]:
# Rolling Window Parameters
# For entire sample forecasting (1997.08-2017.12)
Total = 816  # Total number of data samples
L = 245  # Out-of-sample length
wi = Total - L  # Rolling window length (571)
ws = 0  # Rolling window start index

# Target variable
y = Datas['RV'].iloc[Total - L:Total].values
print(f"Target shape: {y.shape}")


In [None]:
# Process First Quantum Reservoir (QR1)
features1 = ["RV", "MKT", "DP", "IP", "RV_q", "STR", "DEF"]
print(f"Processing QR1 with features: {features1}")

# Generate reservoir signals
signal1 = Quantum_Reservoir(Datas, features1, ms[0], nqubit, K, 1, tau)
print(f"QR1 signal shape: {signal1.shape}")


In [None]:
# Process Second Quantum Reservoir (QR2)
features2 = ["RV", "MKT", "STR", "RV_q", "EP", "INF", "DEF"]
print(f"Processing QR2 with features: {features2}")

# Generate reservoir signals
signal2 = Quantum_Reservoir(Datas, features2, ms[1], nqubit, K, 2, tau)
print(f"QR2 signal shape: {signal2.shape}")


In [None]:
# Train QR1 Model with Rolling Window
W_paras = np.zeros((L, OutLen))  # Regression parameters
Pre1 = np.zeros(L)  # Predictions
model1 = MyModel(L, OutLen, features1)

print("Training QR1 model...")
for j in range(L):
    # Training data
    y_train = Datas['RV'].iloc[ws + j:ws + wi + j].values
    x_train = signal1[:, ws + j:ws + wi + j]
    
    # Ridge regression (with small regularization)
    ridge = Ridge(alpha=0.00000001)
    ridge.fit(x_train.T, y_train)
    W_paras[j, :] = ridge.coef_[:OutLen]
    
    # Prediction
    Pre1[j] = np.dot(W_paras[j, :], signal1[:, ws + wi + j])
    model1.W[j, :] = W_paras[j, :]

# Evaluate QR1
print("\nQR1 Results:")
print(f"Hit rate: {hitrate(Pre1, y):.4f}")
print(f"MSE: {MSE(Pre1, y):.4f}")
print(f"RMSE: {RMSE(Pre1, y):.4f}")
print(f"MAE: {MAE(Pre1, y):.4f}")
print(f"MAPE: {MAPE(Pre1, y):.4f}%")
print(f"QLike: {compute_qlike(Pre1, y):.4f}")


In [None]:
# Train QR2 Model with Rolling Window
W_paras2 = np.zeros((L, OutLen * 2))  # Regression parameters (2x features)
Pre2 = np.zeros(L)  # Predictions
model2 = MyModel(L, OutLen * 2, features2)

print("Training QR2 model...")
for j in range(L):
    # Training data
    y_train = Datas['RV'].iloc[ws + j:ws + wi + j].values
    x_train = signal2[:, ws + j:ws + wi + j]
    
    # Ridge regression
    ridge = Ridge(alpha=0.00000001)
    ridge.fit(x_train.T, y_train)
    W_paras2[j, :min(len(ridge.coef_), OutLen * 2)] = ridge.coef_[:min(len(ridge.coef_), OutLen * 2)]
    
    # Prediction
    Pre2[j] = np.dot(W_paras2[j, :], signal2[:, ws + wi + j])
    model2.W[j, :] = W_paras2[j, :]

# Evaluate QR2
print("\nQR2 Results:")
print(f"Hit rate: {hitrate(Pre2, y):.4f}")
print(f"MSE: {MSE(Pre2, y):.4f}")
print(f"RMSE: {RMSE(Pre2, y):.4f}")
print(f"MAE: {MAE(Pre2, y):.4f}")
print(f"MAPE: {MAPE(Pre2, y):.4f}%")
print(f"QLike: {compute_qlike(Pre2, y):.4f}")


In [None]:
# Create Prediction DataFrame
Prediction = pd.DataFrame({
    'Actual': y,
    'QR1_Prediction': Pre1,
    'QR2_Prediction': Pre2
})

print("\nPrediction Summary:")
print(Prediction.head(10))


In [None]:
# Visualize Predictions
plt.figure(figsize=(16, 8))
time_idx = range(Total - L + 1, Total + 1)
plt.plot(time_idx, Datas['RV'].iloc[Total - L:Total].values, label='Actual', linewidth=3)
plt.plot(time_idx, Pre1, label='QR1 Prediction', linewidth=3)
plt.plot(time_idx, Pre2, label='QR2 Prediction', linewidth=3)
plt.xlabel('Time', fontsize=24)
plt.ylabel('RV', fontsize=24)
plt.legend(fontsize=18)
plt.grid(True)
plt.tight_layout()
plt.savefig('predictions_comparison.png', dpi=150)
plt.show()


In [None]:
# Save Predictions to CSV
# Prediction.to_csv('predict_result.csv', index=False)
print("Predictions saved successfully!")


In [None]:
# Additional Analysis
# This section can be used for SHAP analysis or other interpretability methods
# Note: SHAP analysis would require additional libraries (shap)
# and is beyond the basic conversion scope

print("\nModel conversion complete!")
print("\nNote: This is a classical approximation of the quantum reservoir.")
print("For true quantum simulation, consider using Qiskit or PennyLane.")
