<a href="https://colab.research.google.com/github/ReginaldAboagye/365datascience/blob/master/SVM-BSVI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1: SVM with Adaptive Kernel
First, let's define an SVM with a Gaussian kernel where the width parameter
𝜎
σ is adapted based on volatility estimates.

In [None]:
import numpy as np

def gaussian_kernel(x, y, sigma):
    return np.exp(-np.linalg.norm(x-y)**2 / (2 * sigma**2))

class SVM:
    def __init__(self, C=1.0):
        self.C = C
        self.alpha = None
        self.b = 0
        self.support_vectors = None
        self.y_sv = None

    def fit(self, X, y, sigma_func):
        n_samples, n_features = X.shape
        K = np.zeros((n_samples, n_samples))
        for i in range(n_samples):
            for j in range(n_samples):
                K[i, j] = gaussian_kernel(X[i], X[j], sigma_func(i, j))

        P = np.outer(y, y) * K
        q = -np.ones(n_samples)
        G = np.vstack((-np.eye(n_samples), np.eye(n_samples)))
        h = np.hstack((np.zeros(n_samples), np.ones(n_samples) * self.C))
        A = y.reshape(1, -1)
        b = np.zeros(1)

        alpha = self.solve_qp(P, q, G, h, A, b)
        sv = alpha > 1e-5
        self.alpha = alpha[sv]
        self.support_vectors = X[sv]
        self.y_sv = y[sv]
        self.b = np.mean(self.y_sv - np.sum(self.alpha * self.y_sv * K[sv][:, sv], axis=1))

    def predict(self, X, sigma_func):
        y_pred = []
        for x in X:
            prediction = sum(self.alpha[i] * self.y_sv[i] * gaussian_kernel(x, self.support_vectors[i], sigma_func(i, i))
                             for i in range(len(self.alpha))) + self.b
            y_pred.append(np.sign(prediction))
        return np.array(y_pred)

    def solve_qp(self, P, q, G, h, A, b):
        # Placeholder for QP solver implementation
        return np.random.rand(P.shape[0])  # Dummy implementation

# Example usage
X = np.array([[1, 2], [2, 3], [3, 3], [2, 1], [3, 2]])
y = np.array([1, 1, 1, -1, -1])
sigma_func = lambda i, j: 1.0  # Dummy sigma function

svm = SVM()
svm.fit(X, y, sigma_func)
predictions = svm.predict(X, sigma_func)
print(predictions)


[ 1.  1.  1. -1. -1.]


# **Hamiltonian Monte Carlo (HMC)**
HMC leverages the gradient of the log-posterior to propose new states in a more informed manner compared to traditional MCMC methods. The key steps in HMC include:

*Initialization:* Set initial values for the parameters.

*Leapfrog Integrator:* Use the leapfrog method to simulate Hamiltonian dynamics.

*Metropolis-Hastings Step:* Accept or reject the proposed move based on the Hamiltonian.

In [None]:
import numpy as np

class EKF:
    def __init__(self, F, H, Q, R):
        self.F = F
        self.H = H
        self.Q = Q
        self.R = R

    def predict(self, x, P):
        x_pred = self.F @ x
        P_pred = self.F @ P @ self.F.T + self.Q
        return x_pred, P_pred

    def update(self, x_pred, P_pred, y):
        y_pred = self.H @ x_pred
        S = self.H @ P_pred @ self.H.T + self.R
        K = P_pred @ self.H.T @ np.linalg.inv(S)
        x_upd = x_pred + K @ (y - y_pred)
        P_upd = P_pred - K @ self.H @ P_pred
        return x_upd, P_upd

    def run(self, y, x0, P0):
        n = len(y)
        x_est = np.zeros((n, len(x0)))
        x_est[0] = x0
        P = P0
        for t in range(1, n):
            x_pred, P_pred = self.predict(x_est[t-1], P)
            x_est[t], P = self.update(x_pred, P_pred, y[t])
        return x_est

class BayesianStochasticVolatilityEKF_HMC:
    def __init__(self, F, H, Q, R, mu_prior, phi_prior, sigma_eta_prior, n_iter=1000, epsilon=0.01, L=10):
        self.ekf = EKF(F, H, Q, R)
        self.mu_prior = mu_prior
        self.phi_prior = phi_prior
        self.sigma_eta_prior = sigma_eta_prior
        self.n_iter = n_iter
        self.epsilon = epsilon  # Step size
        self.L = L  # Number of leapfrog steps

    def potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = sigma_eta if np.isscalar(sigma_eta) else sigma_eta.item()  # Ensure sigma_eta is a scalar
        if sigma_eta <= 0:
            return np.inf
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])  # Assume observation noise is 1 for simplicity

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        likelihood = -0.5 * np.sum((y - x_est.flatten()) ** 2)
        prior = -0.5 * (mu - self.mu_prior[0]) ** 2 / self.mu_prior[1] ** 2 \
                -0.5 * (phi - self.phi_prior[0]) ** 2 / self.phi_prior[1] ** 2 \
                -0.5 * (sigma_eta - self.sigma_eta_prior[0]) ** 2 / self.sigma_eta_prior[1] ** 2
        return -(likelihood + prior)

    def gradient_potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = np.atleast_1d(sigma_eta)[0]  # Convert sigma_eta to a scalar

        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        grad_mu = 0
        grad_phi = 0
        grad_sigma_eta = 0
        for t in range(1, len(y)):
            grad_mu += (y[t] - x_est[t]) / np.exp(x_est[t])
            grad_phi += (y[t] - x_est[t]) * (x_est[t-1] - mu) / np.exp(x_est[t])
            grad_sigma_eta += (y[t] - mu - phi * y[t-1])**2 / (2 * sigma_eta**3)

        grad_mu -= (mu - self.mu_prior[0]) / self.mu_prior[1] ** 2
        grad_phi -= (phi - self.phi_prior[0]) / self.phi_prior[1] ** 2
        grad_sigma_eta -= (sigma_eta - self.sigma_eta_prior[0]) / self.sigma_eta_prior[1] ** 2

        return np.array([grad_mu, grad_phi, grad_sigma_eta])

    def leapfrog(self, params, momentum, grad, epsilon):
        momentum_half_step = momentum - 0.5 * epsilon * grad
        params_full_step = params + epsilon * momentum_half_step
        grad_new = self.gradient_potential_energy(params_full_step, log_returns)
        momentum_full_step = momentum_half_step - 0.5 * epsilon * grad_new
        return params_full_step, momentum_full_step, grad_new

    def hmc_step(self, params, y):
        momentum = np.random.normal(size=params.shape)
        current_params = params.copy()
        current_momentum = momentum.copy()
        current_grad = self.gradient_potential_energy(params, y)
        proposed_params = params.copy()
        proposed_momentum = momentum.copy()

        for _ in range(self.L):
            proposed_params, proposed_momentum, current_grad = self.leapfrog(proposed_params, proposed_momentum, current_grad, self.epsilon)

        current_potential = self.potential_energy(current_params, y)
        current_kinetic = 0.5 * np.sum(current_momentum ** 2)
        proposed_potential = self.potential_energy(proposed_params, y)
        proposed_kinetic = 0.5 * np.sum(proposed_momentum ** 2)

        if np.random.uniform() < np.exp(current_potential - proposed_potential + current_kinetic - proposed_kinetic):
            return proposed_params
        else:
            return current_params

    def fit(self, y):
        params = np.array([self.mu_prior[0], self.phi_prior[0], self.sigma_eta_prior[0]])
        samples = np.zeros((self.n_iter, 3))
        for i in range(self.n_iter):
            params = self.hmc_step(params, y)
            samples[i] = params
        return samples

# Example usage
log_returns = np.random.normal(0, 0.1, 100)
F = np.array([[0.9]])  # Initial transition matrix
H = np.array([[1]])    # Observation matrix
Q = np.array([[0.01]]) # Process noise covariance
R = np.array([[1]])    # Observation noise covariance

hmc_ekf_model = BayesianStochasticVolatilityEKF_HMC(F, H, Q, R, mu_prior=(0, 1), phi_prior=(0.9, 0.1), sigma_eta_prior=(0.1, 0.01), n_iter=1000, epsilon=0.01, L=10)
samples = hmc_ekf_model.fit(log_returns)

print("Posterior mean estimates")
print("mu:", np.mean(samples[:, 0]))
print("phi:", np.mean(samples[:, 1]))
print("sigma_eta:", np.mean(samples[:, 2]))


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (3,) + inhomogeneous part.

In [None]:
import numpy as np

class EKF:
    def __init__(self, F, H, Q, R):
        self.F = F
        self.H = H
        self.Q = Q
        self.R = R

    def predict(self, x, P):
        x_pred = self.F @ x
        P_pred = self.F @ P @ self.F.T + self.Q
        return x_pred, P_pred

    def update(self, x_pred, P_pred, y):
        y_pred = self.H @ x_pred
        S = self.H @ P_pred @ self.H.T + self.R
        K = P_pred @ self.H.T @ np.linalg.inv(S)
        x_upd = x_pred + K @ (y - y_pred)
        P_upd = P_pred - K @ self.H @ P_pred
        return x_upd, P_upd

    def run(self, y, x0, P0):
        n = len(y)
        x_est = np.zeros((n, len(x0)))
        x_est[0] = x0
        P = P0
        for t in range(1, n):
            x_pred, P_pred = self.predict(x_est[t-1], P)
            x_est[t], P = self.update(x_pred, P_pred, y[t])
        return x_est

class BayesianStochasticVolatilityEKF_HMC:
    def __init__(self, F, H, Q, R, mu_prior, phi_prior, sigma_eta_prior, n_iter=1000, epsilon=0.01, L=10):
        self.ekf = EKF(F, H, Q, R)
        self.mu_prior = mu_prior
        self.phi_prior = phi_prior
        self.sigma_eta_prior = sigma_eta_prior
        self.n_iter = n_iter
        self.epsilon = epsilon  # Step size
        self.L = L  # Number of leapfrog steps

    def potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = np.asscalar(sigma_eta) if isinstance(sigma_eta, np.ndarray) else sigma_eta  # Ensure sigma_eta is a scalar
        if sigma_eta <= 0:
            return np.inf
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])  # Assume observation noise is 1 for simplicity

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        likelihood = -0.5 * np.sum((y - x_est.flatten()) ** 2)
        prior = -0.5 * (mu - self.mu_prior[0]) ** 2 / self.mu_prior[1] ** 2 \
                -0.5 * (phi - self.phi_prior[0]) ** 2 / self.phi_prior[1] ** 2 \
                -0.5 * (sigma_eta - self.sigma_eta_prior[0]) ** 2 / self.sigma_eta_prior[1] ** 2
        return -(likelihood + prior)

    def gradient_potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = np.asscalar(sigma_eta) if isinstance(sigma_eta, np.ndarray) else sigma_eta  # Ensure sigma_eta is a scalar
        if sigma_eta <= 0:
            return np.array([0, 0, 0])
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        grad_mu = grad_phi = grad_sigma_eta = 0
        for t in range(1, len(y)):
            grad_mu += (y[t] - x_est[t]) / np.exp(x_est[t])
            grad_phi += (y[t] - x_est[t]) * (x_est[t-1] - mu) / np.exp(x_est[t])
            grad_sigma_eta += (y[t] - x_est[t]) * np.random.normal(0, sigma_eta) / np.exp(x_est[t])

        grad_mu -= (mu - self.mu_prior[0]) / self.mu_prior[1] ** 2
        grad_phi -= (phi - self.phi_prior[0]) / self.phi_prior[1] ** 2
        grad_sigma_eta -= (sigma_eta - self.sigma_eta_prior[0]) / self.sigma_eta_prior[1] ** 2

        return np.array([-grad_mu, -grad_phi, -grad_sigma_eta])

    def leapfrog(self, params, momentum, grad, epsilon):
        momentum_half_step = momentum - 0.5 * epsilon * grad
        params_full_step = params + epsilon * momentum_half_step
        grad_new = self.gradient_potential_energy(params_full_step, log_returns)
        momentum_full_step = momentum_half_step - 0.5 * epsilon * grad_new

        # Ensure parameters stay within valid ranges
        params_full_step[2] = max(params_full_step[2], 1e-6)  # sigma_eta must be positive

        return params_full_step, momentum_full_step, grad_new

    def hmc_step(self, params, y):
        momentum = np.random.normal(size=params.shape)
        current_params = params.copy()
        current_momentum = momentum.copy()
        current_grad = self.gradient_potential
        current_grad = self.gradient_potential_energy(params, y)
        proposed_params = params.copy()
        proposed_momentum = momentum.copy()

        for _ in range(self.L):
            proposed_params, proposed_momentum, current_grad = self.leapfrog(proposed_params, proposed_momentum, current_grad, self.epsilon)

        current_potential = self.potential_energy(current_params, y)
        current_kinetic = 0.5 * np.sum(current_momentum ** 2)
        proposed_potential = self.potential_energy(proposed_params, y)
        proposed_kinetic = 0.5 * np.sum(proposed_momentum ** 2)

        if np.random.uniform() < np.exp(current_potential - proposed_potential + current_kinetic - proposed_kinetic):
            return proposed_params
        else:
            return current_params

    def fit(self, y):
        params = np.array([self.mu_prior[0], self.phi_prior[0], self.sigma_eta_prior[0]])
        samples = np.zeros((self.n_iter, 3))
        for i in range(self.n_iter):
            params = self.hmc_step(params, y)
            samples[i] = params
        return samples

# Example usage
log_returns = np.random.normal(0, 0.1, 100)
F = np.array([[0.9]])  # Initial transition matrix
H = np.array([[1]])    # Observation matrix
Q = np.array([[0.01]]) # Process noise covariance
R = np.array([[1]])    # Observation noise covariance

hmc_ekf_model = BayesianStochasticVolatilityEKF_HMC(F, H, Q, R, mu_prior=(0, 1), phi_prior=(0.9, 0.1), sigma_eta_prior=(0.1, 0.01), n_iter=1000, epsilon=0.01, L=10)
samples = hmc_ekf_model.fit(log_returns)

print("Posterior mean estimates")
print("mu:", np.mean(samples[:, 0]))
print("phi:", np.mean(samples[:, 1]))
print("sigma_eta:", np.mean(samples[:, 2]))

AttributeError: 'BayesianStochasticVolatilityEKF_HMC' object has no attribute 'gradient_potential'

In [None]:
import numpy as np

class EKF:
    def __init__(self, F, H, Q, R):
        self.F = F
        self.H = H
        self.Q = Q
        self.R = R

    def predict(self, x, P):
        x_pred = self.F @ x
        P_pred = self.F @ P @ self.F.T + self.Q
        return x_pred, P_pred

    def update(self, x_pred, P_pred, y):
        y_pred = self.H @ x_pred
        S = self.H @ P_pred @ self.H.T + self.R
        K = P_pred @ self.H.T @ np.linalg.inv(S)
        x_upd = x_pred + K @ (y - y_pred)
        P_upd = P_pred - K @ self.H @ P_pred
        return x_upd, P_upd

    def run(self, y, x0, P0):
        n = len(y)
        x_est = np.zeros((n, len(x0)))
        x_est[0] = x0
        P = P0
        for t in range(1, n):
            x_pred, P_pred = self.predict(x_est[t-1], P)
            x_est[t], P = self.update(x_pred, P_pred, y[t])
        return x_est

class BayesianStochasticVolatilityEKF_HMC:
    def __init__(self, F, H, Q, R, mu_prior, phi_prior, sigma_eta_prior, n_iter=1000, epsilon=0.01, L=10):
        self.ekf = EKF(F, H, Q, R)
        self.mu_prior = mu_prior
        self.phi_prior = phi_prior
        self.sigma_eta_prior = sigma_eta_prior
        self.n_iter = n_iter
        self.epsilon = epsilon  # Step size
        self.L = L  # Number of leapfrog steps

    def potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = sigma_eta if np.isscalar(sigma_eta) else sigma_eta.item()  # Ensure sigma_eta is a scalar
        if sigma_eta <= 0:
            return np.inf
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])  # Assume observation noise is 1 for simplicity

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        likelihood = -0.5 * np.sum((y - x_est.flatten()) ** 2)
        prior = -0.5 * (mu - self.mu_prior[0]) ** 2 / self.mu_prior[1] ** 2 \
                -0.5 * (phi - self.phi_prior[0]) ** 2 / self.phi_prior[1] ** 2 \
                -0.5 * (sigma_eta - self.sigma_eta_prior[0]) ** 2 / self.sigma_eta_prior[1] ** 2
        return -(likelihood + prior)

    def gradient_potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = np.atleast_1d(sigma_eta)[0]  # Convert sigma_eta to a scalar

        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])

        n = len(y)
        gradient_mu = np.zeros(1)
        gradient_phi = np.zeros(1)
        gradient_sigma_eta = np.zeros(1)


        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)


        grad_mu = grad_phi = grad_sigma_eta = 0
        for t in range(1, len(y)):
            grad_mu += (y[t] - x_est[t]) / np.exp(x_est[t])
            grad_phi += (y[t] - x_est[t]) * (x_est[t-1] - mu) / np.exp(x_est[t])
            gradient_sigma_eta += (y[t] - mu - phi * y[t-1])**2 / (2 * sigma_eta**3)

        grad_mu -= (mu - self.mu_prior[0]) / self.mu_prior[1] ** 2
        grad_phi -= (phi - self.phi_prior[0]) / self.phi_prior[1] ** 2
        grad_sigma_eta -= (sigma_eta - self.sigma_eta_prior[0]) / self.sigma_eta_prior[1] ** 2

        return np.array([-grad_mu, -grad_phi, -grad_sigma_eta])

    def leapfrog(self, params, momentum, grad, epsilon):
        momentum_half_step = momentum - 0.5 * epsilon * grad
        params_full_step = params + epsilon * momentum_half_step
        grad_new = self.gradient_potential_energy(np.squeeze(params_full_step), log_returns)
        momentum_full_step = momentum_half_step - 0.5 * epsilon * grad_new
        return params_full_step, momentum_full_step, grad_new

    def hmc_step(self, params, y):
        momentum = np.random.normal(size=params.shape)
        current_params = params.copy()
        current_momentum = momentum.copy()
        current_grad = self.gradient_potential_energy(params, y)
        proposed_params = params.copy()
        proposed_momentum = momentum.copy()

        for _ in range(self.L):
            proposed_params, proposed_momentum, current_grad = self.leapfrog(proposed_params, proposed_momentum, current_grad, self.epsilon)

        current_potential = self.potential_energy(current_params, y)
        current_kinetic = 0.5 * np.sum(current_momentum ** 2)
        proposed_potential = self.potential_energy(proposed_params, y)
        proposed_kinetic = 0.5 * np.sum(proposed_momentum ** 2)

        if np.random.uniform() < np.exp(current_potential - proposed_potential + current_kinetic - proposed_kinetic):
            return proposed_params
        else:
            return current_params

    def fit(self, y):
        params = np.array([self.mu_prior[0], self.phi_prior[0], self.sigma_eta_prior[0]])
        samples = np.zeros((self.n_iter, 3))
        for i in range(self.n_iter):
            params = self.hmc_step(params, y)
            samples[i] = params
        return samples

# Example usage
log_returns = np.random.normal(0, 0.1, 100)
F = np.array([[0.9]])  # Initial transition matrix
H = np.array([[1]])    # Observation matrix
Q = np.array([[0.01]]) # Process noise covariance
R = np.array([[1]])    # Observation noise covariance

hmc_ekf_model = BayesianStochasticVolatilityEKF_HMC(F, H, Q, R, mu_prior=(0, 1), phi_prior=(0.9, 0.1), sigma_eta_prior=(0.1, 0.01), n_iter=1000, epsilon=0.01, L=10)
samples = hmc_ekf_model.fit(log_returns)

print("Posterior mean estimates")
print("mu:", np.mean(samples[:, 0]))
print("phi:", np.mean(samples[:, 1]))
print("sigma_eta:", np.mean(samples[:, 2]))


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (3,) + inhomogeneous part.

In [None]:
import numpy as np

class EKF:
    def __init__(self, F, H, Q, R):
        self.F = F
        self.H = H
        self.Q = Q
        self.R = R

    def predict(self, x, P):
        x_pred = self.F @ x
        P_pred = self.F @ P @ self.F.T + self.Q
        return x_pred, P_pred

    def update(self, x_pred, P_pred, y):
        y_pred = self.H @ x_pred
        S = self.H @ P_pred @ self.H.T + self.R
        K = P_pred @ self.H.T @ np.linalg.inv(S)
        x_upd = x_pred + K @ (y - y_pred)
        P_upd = P_pred - K @ self.H @ P_pred
        return x_upd, P_upd

    def run(self, y, x0, P0):
        n = len(y)
        x_est = np.zeros((n, len(x0)))
        x_est[0] = x0
        P = P0
        for t in range(1, n):
            x_pred, P_pred = self.predict(x_est[t-1], P)
            x_est[t], P = self.update(x_pred, P_pred, y[t])
        return x_est

class BayesianStochasticVolatilityEKF_HMC:
    def __init__(self, F, H, Q, R, mu_prior, phi_prior, sigma_eta_prior, n_iter=1000, epsilon=0.01, L=10):
        self.ekf = EKF(F, H, Q, R)
        self.mu_prior = mu_prior
        self.phi_prior = phi_prior
        self.sigma_eta_prior = sigma_eta_prior
        self.n_iter = n_iter
        self.epsilon = epsilon  # Step size
        self.L = L  # Number of leapfrog steps

    def potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = sigma_eta if np.isscalar(sigma_eta) else sigma_eta.item()  # Ensure sigma_eta is a scalar
        if sigma_eta <= 0:
            return np.inf
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])  # Assume observation noise is 1 for simplicity

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        likelihood = -0.5 * np.sum((y - x_est.flatten()) ** 2)
        prior = -0.5 * (mu - self.mu_prior[0]) ** 2 / self.mu_prior[1] ** 2 \
                -0.5 * (phi - self.phi_prior[0]) ** 2 / self.phi_prior[1] ** 2 \
                -0.5 * (sigma_eta - self.sigma_eta_prior[0]) ** 2 / self.sigma_eta_prior[1] ** 2
        return -(likelihood + prior)

    def gradient_potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        sigma_eta = np.atleast_1d(sigma_eta)[0]  # Convert sigma_eta to a scalar

        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        grad_mu = 0
        grad_phi = 0
        grad_sigma_eta = 0
        for t in range(1, len(y)):
            grad_mu += (y[t] - x_est[t]) / np.exp(x_est[t])
            grad_phi += (y[t] - x_est[t]) * (x_est[t-1] - mu) / np.exp(x_est[t])
            grad_sigma_eta += (y[t] - mu - phi * y[t-1])**2 / (2 * sigma_eta**3)

        grad_mu -= (mu - self.mu_prior[0]) / self.mu_prior[1] ** 2
        grad_phi -= (phi - self.phi_prior[0]) / self.phi_prior[1] ** 2
        grad_sigma_eta -= (sigma_eta - self.sigma_eta_prior[0]) / self.sigma_eta_prior[1] ** 2

        return np.array([grad_mu.item(), grad_phi.item(), grad_sigma_eta.item()])

    def leapfrog(self, params, momentum, grad, epsilon):
        momentum_half_step = momentum - 0.5 * epsilon * grad
        params_full_step = params + epsilon * momentum_half_step
        grad_new = self.gradient_potential_energy(params_full_step, log_returns)
        momentum_full_step = momentum_half_step - 0.5 * epsilon * grad_new
        return params_full_step, momentum_full_step, grad_new

    def hmc_step(self, params, y):
        momentum = np.random.normal(size=params.shape)
        current_params = params.copy()
        current_momentum = momentum.copy()
        current_grad = self.gradient_potential_energy(params, y)
        proposed_params = params.copy()
        proposed_momentum = momentum.copy()

        for _ in range(self.L):
            proposed_params, proposed_momentum, current_grad = self.leapfrog(proposed_params, proposed_momentum, current_grad, self.epsilon)

        current_potential = self.potential_energy(current_params, y)
        current_kinetic = 0.5 * np.sum(current_momentum ** 2)
        proposed_potential = self.potential_energy(proposed_params, y)
        proposed_kinetic = 0.5 * np.sum(proposed_momentum ** 2)

        if np.random.uniform() < np.exp(current_potential - proposed_potential + current_kinetic - proposed_kinetic):
            return proposed_params
        else:
            return current_params

    def fit(self, y):
        params = np.array([self.mu_prior[0], self.phi_prior[0], self.sigma_eta_prior[0]])
        samples = np.zeros((self.n_iter, 3))
        for i in range(self.n_iter):
            params = self.hmc_step(params, y)
            samples[i] = params
        return samples

# Example usage
log_returns = np.random.normal(0, 0.1, 100)
F = np.array([[0.9]])  # Initial transition matrix
H = np.array([[1]])    # Observation matrix
Q = np.array([[0.01]]) # Process noise covariance
R = np.array([[1]])    # Observation noise covariance

hmc_ekf_model = BayesianStochasticVolatilityEKF_HMC(F, H, Q, R, mu_prior=(0, 1), phi_prior=(0.9, 0.1), sigma_eta_prior=(0.1, 0.01), n_iter=1000, epsilon=0.01, L=10)
samples = hmc_ekf_model.fit(log_returns)

print("Posterior mean estimates")
print("mu:", np.mean(samples[:, 0]))
print("phi:", np.mean(samples[:, 1]))
print("sigma_eta:", np.mean(samples[:, 2]))


Posterior mean estimates
mu: 0.0
phi: 0.9000000000000002
sigma_eta: 0.10000000000000002


In [None]:
import numpy as np

class EKF:
    def __init__(self, F, H, Q, R):
        self.F = F
        self.H = H
        self.Q = Q
        self.R = R

    def predict(self, x, P):
        x_pred = self.F @ x
        P_pred = self.F @ P @ self.F.T + self.Q
        return x_pred, P_pred

    def update(self, x_pred, P_pred, y):
        y_pred = self.H @ x_pred
        S = self.H @ P_pred @ self.H.T + self.R
        K = P_pred @ self.H.T @ np.linalg.inv(S)
        x_upd = x_pred + K @ (y - y_pred)
        P_upd = P_pred - K @ self.H @ P_pred
        return x_upd, P_upd

    def run(self, y, x0, P0):
        n = len(y)
        x_est = np.zeros((n, len(x0)))
        x_est[0] = x0
        P = P0
        for t in range(1, n):
            x_pred, P_pred = self.predict(x_est[t-1], P)
            x_est[t], P = self.update(x_pred, P_pred, y[t])
        return x_est

class BayesianStochasticVolatilityEKF_HMC:
    def __init__(self, F, H, Q, R, mu_prior, phi_prior, sigma_eta_prior, n_iter=1000, epsilon=0.01, L=10):
        self.ekf = EKF(F, H, Q, R)
        self.mu_prior = mu_prior
        self.phi_prior = phi_prior
        self.sigma_eta_prior = sigma_eta_prior
        self.n_iter = n_iter
        self.epsilon = epsilon  # Step size
        self.L = L  # Number of leapfrog steps

    def potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        if sigma_eta <= 0:
            return np.inf
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])  # Assume observation noise is 1 for simplicity

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        likelihood = -0.5 * np.sum((y - x_est.flatten()) ** 2)
        prior = -0.5 * (mu - self.mu_prior[0]) ** 2 / self.mu_prior[1] ** 2 \
                -0.5 * (phi - self.phi_prior[0]) ** 2 / self.phi_prior[1] ** 2 \
                -0.5 * (sigma_eta - self.sigma_eta_prior[0]) ** 2 / self.sigma_eta_prior[1] ** 2
        return -(likelihood + prior)

    def gradient_potential_energy(self, params, y):
        mu, phi, sigma_eta = params
        if sigma_eta <= 0:
            return np.array([0, 0, 0])
        F = np.array([[phi]])
        H = np.array([[1]])
        Q = np.array([[sigma_eta**2]])
        R = np.array([[1]])

        x0 = np.array([mu])
        P0 = np.array([[sigma_eta**2]])

        x_est = self.ekf.run(y, x0, P0)

        grad_mu = grad_phi = grad_sigma_eta = 0
        for t in range(1, len(y)):
            grad_mu += (y[t] - x_est[t]) / np.exp(x_est[t])
            grad_phi += (y[t] - x_est[t]) * (x_est[t-1] - mu) / np.exp(x_est[t])
            grad_sigma_eta += (y[t] - x_est[t]) * np.random.normal(0, sigma_eta) / np.exp(x_est[t])

        grad_mu -= (mu - self.mu_prior[0]) / self.mu_prior[1] ** 2
        grad_phi -= (phi - self.phi_prior[0]) / self.phi_prior[1] ** 2
        grad_sigma_eta -= (sigma_eta - self.sigma_eta_prior[0]) / self.sigma_eta_prior[1] ** 2

        return np.array([-grad_mu, -grad_phi, -grad_sigma_eta])

    def leapfrog(self, params, momentum, grad, epsilon):
        momentum_half_step = momentum - 0.5 * epsilon * grad
        params_full_step = params + epsilon * momentum_half_step
        grad_new = self.gradient_potential_energy(params_full_step, log_returns)
        momentum_full_step = momentum_half_step - 0.5 * epsilon * grad_new

        # Ensure parameters stay within valid ranges
        params_full_step[2] = max(params_full_step[2], 1e-6)  # sigma_eta must be positive

        return params_full_step, momentum_full_step, grad_new

    def hmc_step(self, params, y):
        momentum = np.random.normal(size=params.shape)
        current_params = params.copy()
        current_momentum = momentum.copy()
        current_grad = self.gradient_potential_energy(params, y)
        proposed_params = params.copy()
        proposed_momentum = momentum.copy()

        for _ in range(self.L):
            proposed_params, proposed_momentum, current_grad = self.leapfrog(proposed_params, proposed_momentum, current_grad, self.epsilon)

        current_potential = self.potential_energy(current_params, y)
        current_kinetic = 0.5 * np.sum(current_momentum ** 2)
        proposed_potential = self.potential_energy(proposed_params, y)
        proposed_kinetic = 0.5 * np.sum(proposed_momentum ** 2)

        if np.random.uniform() < np.exp(current_potential - proposed_potential + current_kinetic - proposed_kinetic):
            return proposed_params
        else:
            return current_params

    def fit(self, y):
        params = np.array([self.mu_prior[0], self.phi_prior[0], self.sigma_eta_prior[0]])
        samples = np.zeros((self.n_iter, 3))
        for i in range(self.n_iter):
            params = self.hmc_step(params, y)
            samples[i] = params
        return samples

# Example usage
log_returns = np.random.normal(0, 0.1, 100)
F = np.array([[0.9]])  # Initial transition matrix
H = np.array([[1]])    # Observation matrix
Q = np.array([[0.01]]) # Process noise covariance
R = np.array([[1]])    # Observation noise covariance

hmc_ekf_model = BayesianStochasticVolatilityEKF_HMC(F, H, Q, R, mu_prior=(0, 1), phi_prior=(0.9, 0.1), sigma_eta_prior=(0.1, 0.01), n_iter=1000, epsilon=0.01, L=10)
samples = hmc_ekf_model.fit(log_returns)

print("Posterior mean estimates")
print("mu:", np.mean(samples[:, 0]))
print("phi:", np.mean(samples[:, 1]))
print("sigma_eta:", np.mean(samples[:, 2]))


ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

# **Step 3: Hybrid Model Integration**
Finally, we integrate the SVM with the adaptive kernel and the Bayesian stochastic volatility model.

In [None]:
# Hybrid model combining SVM and Bayesian Stochastic Volatility
class HybridModel:
    def __init__(self, C=1.0, mu=0.0, phi=0.9, sigma_eta=0.1):
        self.svm = SVM(C)
        self.bsv = BayesianStochasticVolatilityEKF_HMC(mu, phi, sigma_eta)

    def fit(self, X, y, log_returns):
        h = self.bsv.fit(log_returns)
        sigma_func = lambda i, j: np.sqrt(np.exp(h[i] + h[j]))
        self.svm.fit(X, y, sigma_func)

    def predict(self, X):
        h = self.bsv.fit(np.zeros(len(X)))  # Dummy h values for prediction
        sigma_func = lambda i, j: np.sqrt(np.exp(h[i] + h[j]))
        return self.svm.predict(X, sigma_func)

# Example usage
log_returns = np.random.normal(0, 0.1, 100)
hybrid_model = HybridModel()
hybrid_model.fit(X, y, log_returns)
hybrid_predictions = hybrid_model.predict(X)
print(hybrid_predictions)


TypeError: BayesianStochasticVolatilityEKF_HMC.__init__() missing 4 required positional arguments: 'R', 'mu_prior', 'phi_prior', and 'sigma_eta_prior'