In [None]:
'''GARCH'''
from typing import Sequence

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.optimize
import scipy.stats

def b1_square(x_view: np.ndarray) -> np.ndarray:
    return np.square(x_view[-1, :])


def b1(sigma_view: np.ndarray) -> np.ndarray:
    return sigma_view[-1, :]


def loglik(ret: np.ndarray, params: Sequence, vola_ret_features=b1_square, vola_sigma_features=b1):
    """Calculate the likelihood of a process path"""
    ret = ret.reshape((-1, 1))
    gamma_0, gamma_1, lambda_1 = params
    sigma_squared = np.repeat(gamma_0, len(ret)).reshape(ret.shape)
    sigma_squared[0, 0] = gamma_0

    for s in range(1, len(ret)):
        sigma_squared[s, :] = (
            gamma_0
            + np.dot(gamma_1, vola_ret_features(ret[0:s, :]))
            + np.dot(lambda_1, vola_sigma_features(sigma_squared[0:s, :]))
        )

    return np.sum(scipy.stats.norm.logpdf(ret[1:, 0], loc=0.0, scale=np.sqrt(sigma_squared[1:, 0])))


def mle(ret: np.ndarray, start_params: Sequence):
    """Maximum-likelihood estimator"""

    def error_fuc(theta):
        return -loglik(ret, theta)

    start_params = np.array(start_params)
    result = scipy.optimize.minimize(
        error_fuc,
        start_params,
        method='L-BFGS-B',
        bounds=[(1e-8, 1.0), (1e-8, 1.0), (1e-8, 1.0)],
        options={'maxiter': 250, 'disp': False},
    )
    return result


def path(
    no_paths: int, t: int, params: Sequence, vola_ret_features=b1_square, vola_sigma_features=b1
):
    """Simulate process paths"""
    assert no_paths > 0 and t > 0
    assert len(params) == 3

    ret = np.random.randn(t, no_paths)
    gamma_0, gamma_1, lambda_1 = params
    gamma_0 = np.repeat(gamma_0, no_paths).reshape((1, no_paths))
    sigma_squared = np.zeros((t, no_paths))
    sigma_squared[0, :] = gamma_0
    ret[0, :] = 0.0

    for s in range(1, t):
        sigma_squared[s, :] = (
            gamma_0
            + np.dot(gamma_1, vola_ret_features(ret[0:s, :]))
            + np.dot(lambda_1, vola_sigma_features(sigma_squared[0:s, :]))
        )
        ret[s, :] = ret[s, :] * np.sqrt(sigma_squared[s, :])

    return ret, np.sqrt(sigma_squared)


def noise_from_path(
    ret: np.ndarray, params: Sequence, vola_ret_features=b1_square, vola_sigma_features=b1
) -> np.ndarray:
    """Extract the noise process path from a GARCH path given a parameter set"""
    ret = ret.reshape((-1, 1))
    gamma_0, gamma_1, lambda_1 = params
    sigma_squared = np.repeat(gamma_0, len(ret)).reshape(ret.shape)
    sigma_squared[0, 0] = gamma_0
    noise = np.zeros(ret.shape)

    noise[0, :] = ret[0, :] / np.sqrt(sigma_squared[0, :])
    for s in range(1, len(ret)):
        sigma_squared[s, :] = (
            gamma_0
            + np.dot(gamma_1, vola_ret_features(ret[0:s, :]))
            + np.dot(lambda_1, vola_sigma_features(sigma_squared[0:s, :]))
        )
        noise[s, :] = ret[s, :] / np.sqrt(sigma_squared[s, :])
    return noise


if __name__ == '__main__':
    plt.style.use('ggplot')
 

In [20]:
'''EGARCH'''
from typing import Sequence

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.optimize
import scipy.stats

def b1_square(x_view: np.ndarray) -> np.ndarray:
    return np.square(x_view[-1, :])


def b1(sigma_view: np.ndarray) -> np.ndarray:
    return sigma_view[-1, :]

E_ABS_Z = np.sqrt(2 / np.pi)

def loglik(ret: np.ndarray, params: Sequence, vola_ret_features=b1_square, vola_sigma_features=b1):
    """Calculate the likelihood of a process path"""
    ret = ret.reshape((-1, 1))
    omega, alpha, gamma, beta = params
    log_sigma_squared = np.zeros_like(ret)
    log_sigma_squared[0, 0] = omega

    for s in range(1, len(ret)):
        sigma_prev = np.sqrt(np.exp(log_sigma_squared[s - 1, 0]))
        z_prev = ret[s - 1, 0] / sigma_prev
        log_sigma_squared[s, :] = (
            omega
            + beta * np.log(sigma_prev**2)
            + alpha * (np.abs(z_prev) - E_ABS_Z)
            + gamma * z_prev
        )

    sigma_squared = np.exp(log_sigma_squared)

    return np.sum(scipy.stats.norm.logpdf(ret[1:, 0], loc=0.0, scale=np.sqrt(sigma_squared[1:, 0])))


def mle(ret: np.ndarray, start_params: Sequence):
    """Maximum-likelihood estimator"""

    def error_fuc(theta):
        return -loglik(ret, theta)

    start_params = np.array(start_params)
    result = scipy.optimize.minimize(
        error_fuc,
        start_params,
        method='L-BFGS-B',
        bounds = [
        (None, None),   # omega
        (0.0, 1.0),     # alpha
        (None, None),   # gamma
        (0.0, 1.0)],    # beta]
        options={'maxiter': 250, 'disp': False},
    )
    return result


def path(
    no_paths: int, t: int, params: Sequence, vola_ret_features=b1_square, vola_sigma_features=b1
):
    """Simulate process paths"""
    assert no_paths > 0 and t > 0
    assert len(params) == 3

    ret = np.random.randn(t, no_paths)
    omega, alpha, gamma, beta = params
    log_sigma_squared = np.zeros((t, no_paths))
    log_sigma_squared[0, :] = omega
    z = np.random.randn(t, no_paths)
    ret[0, :] = 0.0

    for s in range(1, t):
        sigma_prev = np.sqrt(np.exp(log_sigma_squared[s - 1, :]))
        z_prev = ret[s - 1, :] / sigma_prev
        log_sigma_squared[s, :] = (
            omega
            + beta * np.log(sigma_prev**2)
            + alpha * (np.abs(z_prev) - E_ABS_Z)
            + gamma * z_prev
        )
        ret[s, :] = z[s, :] * np.sqrt(log_sigma_squared[s, :])

    return ret, np.sqrt(log_sigma_squared)


def noise_from_path(ret: np.ndarray, params: Sequence):
    """
    Extract standardized residuals z_t from EGARCH path
    """

    ret = ret.reshape((-1, 1))
    omega, alpha, gamma, beta = params

    log_sigma_sq = np.zeros_like(ret)
    log_sigma_sq[0, 0] = omega
    z = np.zeros_like(ret)

    for t in range(1, len(ret)):
        sigma_prev = np.sqrt(np.exp(log_sigma_sq[t - 1, 0]))
        z_prev = ret[t - 1, 0] / sigma_prev

        log_sigma_sq[t, 0] = (
            omega
            + beta * log_sigma_sq[t - 1, 0]
            + alpha * (np.abs(z_prev) - E_ABS_Z)
            + gamma * z_prev
        )

        z[t, 0] = ret[t, 0] / np.sqrt(np.exp(log_sigma_sq[t, 0]))

    return z


if __name__ == '__main__':
    plt.style.use('ggplot')
 

In [4]:
"""GRU"""
import torch
from torch import nn 
import torch.nn.functional as F

class GRUCell(nn.Module):

    def __init__(self, input_dim, hidden_dim) -> None:
        super(GRUCell, self).__init__()
        self.input_dim, self.hidden_dim = input_dim, hidden_dim
        self.relevance_whh, self.relevance_wxh, self.relevance_b = self.create_gate_parameters()
        self.update_whh, self.update_wxh, self.update_b = self.create_gate_parameters()
        self.candidate_whh, self.candidate_wxh, self.candidate_b = self.create_gate_parameters()

    def create_gate_parameters(self):
        input_weights = nn.Parameter(torch.zeros(self.input_dim, self.hidden_dim))
        hidden_weights = nn.Parameter(torch.zeros(self.hidden_dim, self.hidden_dim))
        nn.init.xavier_uniform_(input_weights)
        nn.init.xavier_uniform_(hidden_weights)
        bias = nn.Parameter(torch.zeros(self.hidden_dim))
        return hidden_weights, input_weights, bias
    
    def forward(self, x, h):
        output_hiddens = []
        for i in range(x.shape[1]):
            relevance_gate = F.sigmoid((h @ self.relevance_whh) + (x[:, i] @ self.relevance_wxh) + self.relevance_b)
            update_gate = F.sigmoid((h @ self.update_whh) + (x[:, i] @ self.update_wxh) + self.update_b)
            candidate_hidden = F.tanh(((relevance_gate * h) @ self.candidate_whh) + (x[:, i] @ self.candidate_wxh) + self.candidate_b)
            h = (update_gate * candidate_hidden) + ((1 - update_gate) * h)
            output_hiddens.append(h.unsqueeze(1))
        return torch.concat(output_hiddens, dim=1)


class MultiLayerGRU(nn.Module):

    def __init__(self, input_dim, hidden_dim, num_layers, dropout):
        super(MultiLayerGRU, self).__init__()
        self.input_dim, self.hidden_dim, self.num_layers = input_dim, hidden_dim, num_layers
        self.layers = nn.ModuleList()
        self.layers.append(GRUCell(input_dim, hidden_dim))
        for _ in range(num_layers - 1):
            self.layers.append(GRUCell(hidden_dim, hidden_dim))
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(hidden_dim, input_dim)
        nn.init.xavier_uniform_(self.linear.weight.data)
        self.linear.bias.data.fill_(0.0)

    def forward(self, x, h):
        output_hidden = self.layers[0](x, h[0])
        new_hidden = [output_hidden[:, -1].unsqueeze(0)]
        for i in range(1, self.num_layers):
            output_hidden = self.layers[i](self.dropout(output_hidden), h[i])
            new_hidden.append(output_hidden[:, -1].unsqueeze(0))
        return self.linear(self.dropout(output_hidden)), torch.concat(new_hidden, dim=0)

In [5]:
"LIGRU"
import torch
from torch import nn 
import torch.nn.functional as F

class GRUCell(nn.Module):

    def __init__(self, input_dim, hidden_dim) -> None:
        super(GRUCell, self).__init__()
        self.input_dim, self.hidden_dim = input_dim, hidden_dim
        self.update_whh, self.update_wxh, self.update_b = self.create_gate_parameters()
        self.candidate_whh, self.candidate_wxh, self.candidate_b = self.create_gate_parameters()

    def create_gate_parameters(self):
        input_weights = nn.Parameter(torch.zeros(self.input_dim, self.hidden_dim))
        hidden_weights = nn.Parameter(torch.zeros(self.hidden_dim, self.hidden_dim))
        nn.init.xavier_uniform_(input_weights)
        nn.init.xavier_uniform_(hidden_weights)
        bias = nn.Parameter(torch.zeros(self.hidden_dim))
        return hidden_weights, input_weights, bias
    
    def forward(self, x, h):
        output_hiddens = []
        for i in range(x.shape[1]):
            update_gate = F.sigmoid((h @ self.update_whh) + (x[:, i] @ self.update_wxh) + self.update_b)
            candidate_hidden = F.relu((h @ self.candidate_whh) + ((x[:, i] @ self.candidate_wxh) + self.candidate_b))
            h = (update_gate * candidate_hidden) + ((1 - update_gate) * h)
            output_hiddens.append(h.unsqueeze(1))
        return torch.concat(output_hiddens, dim=1)


class MultiLayerGRU(nn.Module):

    def __init__(self, input_dim, hidden_dim, num_layers, dropout):
        super(MultiLayerGRU, self).__init__()
        self.input_dim, self.hidden_dim, self.num_layers = input_dim, hidden_dim, num_layers
        self.layers = nn.ModuleList()
        self.layers.append(GRUCell(input_dim, hidden_dim))
        for _ in range(num_layers - 1):
            self.layers.append(GRUCell(hidden_dim, hidden_dim))
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(hidden_dim, input_dim)
        nn.init.xavier_uniform_(self.linear.weight.data)
        self.linear.bias.data.fill_(0.0)

    def forward(self, x, h):
        output_hidden = self.layers[0](x, h[0])
        new_hidden = [output_hidden[:, -1].unsqueeze(0)]
        for i in range(1, self.num_layers):
            output_hidden = self.layers[i](self.dropout(output_hidden), h[i])
            new_hidden.append(output_hidden[:, -1].unsqueeze(0))
        return self.linear(self.dropout(output_hidden)), torch.concat(new_hidden, dim=0)

In [None]:
"Example Usage"
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
dataset = pd.read_csv("airline-passengers.csv")

# parse time
dataset["Month"] = pd.to_datetime(dataset["Month"])
dataset = dataset.sort_values("Month")

values = dataset["Passengers"].values.astype("float32")
log_values = np.log(values)

returns = np.diff(log_values)
returns = returns - returns.mean()
start_params = [-0.1, 0.1, 0.0, 0.9]
result = mle(returns, start_params)
params_hat = result.x

print("Estimated parameters:", params_hat)

ll_value = loglik(returns, params_hat)
print("Log-likelihood:", ll_value)



Estimated parameters: [-0.83507327  0.          0.05807386  0.81580395]
Log-likelihood: 110.93286401599653
