# Analisi statistica intervalli di ricorrenza extreme returns

Questo notebook contiene le analisi statistiche fatte per il paper.

Il flusso è il seguente:

- [x] utilizzo del dataset *Dow Jones* con la massima ampiezza storica disponibile (1985 - 2019)
- [x] calcolo dei log returns
- [x] plot degli intervalli di ricorrenza per il Dow Jones, con metodi diversi:
    - [x] quantile threshold al
        - [x] 95%
        - [x] 97.5%
        - [x] 99%
        - [x] verifica della relazione $\tau_Q = \frac{Q}{1 - Q}$ dove $Q$ è il quantile scelto (0.95, 0.975, 0.99), $\tau_Q$ l'intervallo di ricorrenza medio, e confronto con l'evidenza dei dati
    - [x] peak-over-threshold definito come $pot = \mu \pm m \cdot \sigma$: ricavare analiticamente m come $m = \frac{q_x - \mu}{\sigma}$ dove $q_x$ è il quantile di ordine $x$
        
Infine, tutto dovrà essere rifatto per una azione dell'*S&P500*, non per il Dow Jones come di seguito.

In [None]:
import os
import time
import datetime
from typing import List
import itertools
import pickle
import math
import copy

import numpy as np
import pandas as pd
import scipy.stats
import scipy.special as sfun
from scipy.stats import genextreme as gev
import sklearn.metrics as sm

from statsmodels.tsa import stattools
from statsmodels.graphics import tsaplots

import matplotlib.pyplot as pl
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import seaborn as sns

from tqdm import tqdm
import ipdb

import numba

# import dello stimatore di Hill e del @timeit
import tail_estimation
from my_timeit import timeit

In [None]:
%pdb on

In [None]:
%load_ext line_profiler
%load_ext autoreload

In [None]:
%autoreload 5

In [None]:
return_type = ['pos', 'neg', 'abs']
quantile_type = ['95', '97.5', '99', 'evt']
distribution_type = ['weibull', 's-exp', 'q-exp']

colors = {
    'pos': 'seagreen',
    'neg': 'darkred',
    'abs': 'royalblue',
}

legend_labels = {
    'pos': r'$r$',
    'neg': r'$-r$',
    'abs': r'$|r|$',
}

## 1. Caricamento dei dati e divisione train-test

In [None]:
data_path = "/Users/pietro/Google Drive/OptiRisk Thesis/data"
djia_path = os.path.join(data_path, 'DJIA.csv')

Conversione delle date e settaggio dell'index del dataframe

In [None]:
djia = pd.read_csv(djia_path)
djia.loc[:, 'Date'] = pd.to_datetime(djia['Date'], format="%Y-%m-%d")
djia.index = djia['Date']
djia.drop(columns=['Date'], inplace=True)
djia.head()

Ora dichiariamo le date in cui bisogna dividere i dati, visto che ci sono state crisi nei mesi/anni successivi:

In [None]:
split_dates = {
    'insurance': datetime.datetime(1987, 1, 1), # insurance companies crisis
    'dot-com': datetime.datetime(2000, 1, 1), # dot-com bubble explodes
    'subprime-crisis': datetime.datetime(2007, 1, 1), # subprime crisis
    'eu-debt': datetime.datetime(2011, 1, 1), # EU sovereign debt crisis
}

## 2. Calcolo log-returns

In [None]:
log_returns = np.log(djia.loc[:, ['Adj Close']]).diff(periods=1).iloc[1:, :]
log_returns.head()

In [None]:
fig, ax = pl.subplots(nrows=1, ncols=2, figsize=(18, 7))

ax[0].plot(djia['Adj Close'])
ax[0].set(xlabel='Date', ylabel='DJIA', title='Dow Jones index value')
sns.despine()

ax[1].plot(log_returns, label='Log Returns')
ax[1].set(xlabel='Date', ylabel='Log Returns', title='Dow Jones Log Returns')
sns.despine()

Creo una funzione per dividere il dataset prima e dopo gli eventi critici:

In [None]:
def divide(data: pd.DataFrame, before_date: datetime.datetime):
    """Split the data before and after the before_date."""
    before = data[data.index < before_date]
    after = data[data.index >= before_date]
    
    return before, after

In [None]:
returns_before_after = {
    event: divide(log_returns, split_dates[event])
    for event in split_dates.keys()
}

Ora scelgo quale sia la data di splitting e faccio le analisi con quella:

In [None]:
split_key = 'subprime-crisis'

# divisione dataset in training (in-sample) e testing (out-of-sample), qui prima e dopo la crisi finanziaria 2007-2008
lr_before, lr_after = returns_before_after[split_key]

# returns contiene il training set
returns = {
    'pos': lr_before['Adj Close'][lr_before['Adj Close'] > 0.0],
    'neg': lr_before['Adj Close'][lr_before['Adj Close'] < 0.0],
    'abs': lr_before['Adj Close'].abs(),
}

returns_test = {
    'pos': lr_after['Adj Close'][lr_after['Adj Close'] > 0.0],
    'neg': lr_after['Adj Close'][lr_after['Adj Close'] < 0.0],
    'abs': lr_after['Adj Close'].abs(),
}

## 3. Stima dei parametri della distribuzione GEV

In questa sezione si replica la sezione 4.1 del paper.

Secondo la [Extreme Value Theory](https://en.wikipedia.org/wiki/Extreme_value_theory), trovare gli estremi significa trovare un gruppo di dati $x \geq x_t$ dove $x_t$ è l'*extreme value threshold*, e che soddisfi la GEV ([Generalized Extreme Values Distribution](https://en.wikipedia.org/wiki/Generalized_extreme_value_distribution)) che ha distribuzione cumulativa:

\begin{align*}
    G(x) &= exp\left[- \left(1 + \xi \frac{x - \mu}{\sigma}\right)^{-1/\xi}\right] \; for \; \xi \neq 0\\ 
    G(x) &= exp\left[-exp\left(\frac{x - \mu}{\sigma}\right)\right] \; for \; \xi = 0\\ 
\end{align*}

dove $\xi$ è lo *shape parameter* che determina la forma della coda, $1/\xi$ è il *tail exponent* della distribuzione.

Per trovare il threshold $x_t$, usiamo questo metodo:

1. sort dei dati (tutti i log-returns) in ordine discendente (o non-ascendente) per avere la sequenza $x_1 \geq x_2 \geq \ldots \geq x_n$
2. applicare lo [stimatore di Hill](https://en.wikipedia.org/wiki/Heavy-tailed_distribution#Hill's_tail-index_estimator) dove $n$ è il numero di samples, $k$ l'indice del k-esimo dato più grande (posizione k nella sequenza ordinata) chiamato *k-th order statistic*

$$\hat{\xi}_{k,n} = \frac{1}{\gamma - 1} = \frac{1}{k}\sum_{i=1}^{k}log\left(\frac{x_i}{x_{k+1}}\right)$$

3. calcolare la statistica di [Kolmogorov-Smirnov](https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test#Kolmogorov%E2%80%93Smirnov_statistic) per quantificare il fitting tra la distribuzione GEV così ricavata (con $\hat{\xi}_{k,n}$ come esponente) e quella empirica della coda, dove la coda è rappresentata dai returns che sono $x \geq x_t$, cioé quelli ordinati discendenti con indice $i < k$
4. scegliere $k$ (e di conseguenza $x_t$ che non è altro che il k-esimo elemento dei return ordinati discendenti) come il valore associato alla *minima* statistica di Kolmogorov-Smirnov

Questo flusso viene applicato a tutti e 3 i tipi di returns considerati: positivi, negativi e assoluti.

Inoltre, usiamo una seconda versione di questo flusso, che è:

1. sort dei dati (tutti i log-returns) in ordine discendente (o non-ascendente) per avere la sequenza $x_1 \geq x_2 \geq \ldots \geq x_n$
2. scorrere nei returns ordinati con un indice $k$ che identifica il threshold scelto e fitting di una GEV sui *tail data*, dove per tail data si intendono tutti i returns con indice $i < k$
3. calcolare la statistica di [Kolmogorov-Smirnov](https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test#Kolmogorov%E2%80%93Smirnov_statistic) per quantificare il fitting tra la distribuzione GEV e i dati
4. scegliere $k$ (e di conseguenza $x_t$ che non è altro che il k-esimo elemento dei return ordinati discendenti) come il valore associato alla *minima* statistica di Kolmogorov-Smirnov

Ciò che cambia tra i due flussi è che nel primo, calcoliamo *separatamente* $\gamma$ e gli altri parametri $\mu$ e $\sigma$, mentre nel secondo caso tutti insieme con una maximum-likelihood estimation.

Calcolo i return ordinati dal più grande al più piccolo per i positivi, negativi e assoluti per prima cosa e poi procedo.

In [None]:
# 1. sorting dei returns - va fatto sia per il positivo, che per il negativo, che per gli assoluti
sorted_positive_lr = returns['pos'].sort_values(ascending=False)
sorted_negative_lr = (-returns['neg']).sort_values(ascending=False)
sorted_absolute_lr = returns['abs'].sort_values(ascending=False)

eps = 5e-6

sorted_lr = {
    'pos': sorted_positive_lr[sorted_positive_lr >= eps],
    'neg': sorted_negative_lr[sorted_negative_lr >= eps],
    'abs': sorted_absolute_lr[sorted_absolute_lr >= eps],
}

### 3.1 Primo flusso

Cominciamo stimando $\gamma$.

#### 3.1.1 Stima di $\gamma$ separata

In [None]:
@timeit
def estimate_shape_param(x: np.ndarray, get_optimum=False, n_times=1):
    """Estimate the shape parameter gamma for the GEV using Hill estimator.
    Input MUST be sorted in descending order.
    """
    # xi_estimation[0]: order statistics
    # xi_estimation[1]: tail index estimates (xi su Wikipedia, gamma nel paper)
    # xi_estimation[2]: optimal order statistics (k)
    # xi_estimation[3]: tail index for the optimal order statistics

    # xi_estimation[4]: array of fractions of order statistics used for the 1st bootstrap sample
    # xi_estimation[5]: corresponding AMSE values
    # xi_estimation[6]: fraction of order statistics corresponding to the minimum of AMSE for the 1st bootstrap
    # xi_estimation[7]: index of the 1st bootstrap sample's order statistics array corresponding to the
    #                   minimization boundary set by eps_stop parameter

    # xi_estimation[8]: array of fractions of order statistics used for the 2nd bootstrap sample
    # xi_estimation[9]: corresponding AMSE values
    # xi_estimation[10]: fraction of order statistics corresponding to the minimum of AMSE for the 2nd bootstrap
    # xi_estimation[11]: index of the 2nd bootstrap sample's order statistics array corresponding to the
    #                   minimization boundary set by eps_stop parameter
    assert isinstance(get_optimum, bool)
    
    # check descending
    assert np.all(np.diff(x) <= 0.0)
    
    xis = np.zeros((n_times, x.shape[0] - 1))
    optimal_kappas = np.zeros((n_times, ), dtype=np.int)
    optimal_xis = np.zeros((n_times, ), dtype=np.float64)
    
    for i in range(n_times):
        xi_estimation = tail_estimation.hill_estimator(x, bootstrap=get_optimum)

        kappas = xi_estimation[0]
        xi_hill = xi_estimation[1]
        
        xis[i, :] = xi_hill[:]
        
        if get_optimum:
            optimal_kappas[i] = xi_estimation[2]
            optimal_xis[i] = xi_estimation[3]
            
    xis = np.mean(xis, axis=0)  # per ogni colonna (k) la media delle xi che ha trovato
    
    if get_optimum:
        optimal_k = int(round(np.mean(optimal_kappas)))
        optimal_xi = np.mean(optimal_xis)
        optimal_gamma = 1.0 + (1.0 / optimal_xi)
        
        return {
            'kappas': kappas,
            'xis': xis,
            'gammas': 1.0 + (1.0 / xis),
            'k_opt': optimal_k,
            'xi_opt': optimal_xi,
            'gamma_opt': optimal_gamma,
        }
    else:
        return {
            'kappas': kappas,
            'xis': xis,
            'gammas': 1.0 + (1.0 / xis),
        }

Posso la mia funzione per stimare $\gamma$ esattamente come nel paper, cioè:

$$
\gamma = \frac{1}{k} \sum_{i = 1}{k} \left[ \ln x_{n + 1 - k} - \ln x_k \right]
$$

che però richiede i returns ordinati **discendenti**.

Invece, uso la formula nell'altro paper [Tail index estimation, concentration and adaptivity](http://arxiv.org/abs/1503.05077), cioé

$$
\hat{\gamma}(k) = \frac{1}{k} \sum_{i = 1}^{k} \ln \left( \frac{x_i}{x_{k + 1}} \right)
$$

che richiede i returns ordinati **discendenti**.

In [None]:
def my_hill_estimator(lr: np.ndarray):
    """Get the estimation of gamma."""
    assert np.all(np.diff(lr) <= 0.0)  # ordinati discendenti, lr[i] - lr[i - 1] <= 0
    
    k_max = lr.shape[0]
    kappas = np.arange(1, k_max)
    
    gammas = np.zeros((kappas.shape[0], ), dtype=np.float64)
    
    for index, k in enumerate(kappas):  # index = k - 1
        ssum = np.sum(np.log(lr[:k - 1] / lr[k - 1 + 1]))        
        gammas[index] = (1 / k) * ssum
        
    return {
        'xis': gammas,
        'kappas': kappas
    }

Calcolo la stima di $\xi = \frac{1}{\gamma - 1}$ con lo stimatore di Hill per i tre returns:

In [None]:
hill_estimation = {
    ret_type: estimate_shape_param(sorted_lr[ret_type].values, get_optimum=True, n_times=30)
    for ret_type in return_type
}

print("")
title_format = "{:>15}"*5
row_format = "{:>15}" + "{:>15.3f}" * 2 + "{:>15}{:>15f}"
print(title_format.format('Return type', 'Xi', 'gamma = -c', 'k', 'r_opt'))
print("-" * 15 * 5)
for ret_type in return_type:
    est = hill_estimation[ret_type]
    print(row_format.format(ret_type, est['xi_opt'], est['gamma_opt'], est['k_opt'], sorted_lr[ret_type].values[est['k_opt']]))

Ora, calcoliamo $\xi = \gamma = -c$ con la mia funzione

In [None]:
my_hill_estimation = {
    ret_type: my_hill_estimator(sorted_lr[ret_type].values)
    for ret_type in return_type
}

#### 3.1.2 Plot di $\xi_{k, n}$ e $\gamma$ al variare di k e dei returns

Plot di $\xi$ e $\gamma$ al variare di $k$, cioè del threshold, e plot di $\xi$ e $\gamma$ al variare dei return, per vedere come cambia a seconda di quale return si prenda come threshold:

In [None]:
# plot delle stime di xi e gamma al variare di k
truncation = 100

# sulle righe in funzione di k o dei returns
# sulle colonne lo xi stimato con il codice trovato su GitHub, lo xi stimato da me, e la conversione gamma = 1 + 1/xi
fig, ax = pl.subplots(nrows=2, ncols=2, figsize=(18, 14))

# Xi - GitHub
for ret_type in return_type:
    est = hill_estimation[ret_type]
    x = est['kappas'][truncation:]
    y = est['xis'][truncation:]

    ax[0][0].plot(x, y, color=colors[ret_type], label=legend_labels[ret_type])
    ax[0][0].plot(est['k_opt'], est['xi_opt'], marker='s', markersize=5, color=colors[ret_type])

ax[0][0].set(
    xlabel=r'$k$',
    ylabel=r'$\hat{\xi}_{k,n} = \frac{1}{\hat{\gamma} - 1}$',
    title=r'Hill estimation of $\hat{\xi}_{k,n}$ as function of $k$'
)

# Xi - mio codice
for ret_type in return_type:
    est = my_hill_estimation[ret_type]
    x = est['kappas'][truncation:]
    y = est['xis'][truncation:]

    ax[0][1].plot(x, y, color=colors[ret_type], label=legend_labels[ret_type])

ax[0][1].set(
    xlabel=r'$k$',
    ylabel=r'$\hat{\xi}_{k,n} = \gamma(k)$',
    title=r'My Hill estimation of $\hat{\xi}_{k,n}$ as function of $k$'
)

###########################################
# seconda riga, in funzione dei log-returns
# Xi - GitHub
for ret_type in return_type:
    est = hill_estimation[ret_type]
    x = sorted_lr[ret_type].values[1:][::-1][truncation:]
    y = est['xis'][truncation:]

    ax[1][0].semilogx(x, y, color=colors[ret_type], label=legend_labels[ret_type])

ax[1][0].set(
    xlabel=r'$r$, $-r$, $|r|$',
    ylabel=r'$\hat{\xi}_{k,n} = \frac{1}{\hat{\gamma} - 1}$',
    title=r'Hill estimation of $\hat{\xi}_{k,n}$ as function of the returns'
)

# Xi - mio codice
for ret_type in return_type:
    est = my_hill_estimation[ret_type]
    x = sorted_lr[ret_type].values[1:][::-1][truncation:]
    y = est['xis'][truncation:]

    ax[1][1].semilogx(x, y, color=colors[ret_type], label=legend_labels[ret_type])

ax[1][1].set(
    xlabel=r'$r$, $-r$, $|r|$',
    ylabel=r'$\hat{\xi}_{k,n} = \gamma(k)$',
    title=r'My Hill estimation of $\hat{\xi}_{k,n}$ as function of the returns'
)

m, n = ax.shape
for i in range(m):
    for j in range(n):
        ax[i, j].legend()

sns.despine()

C'è differenza tra la mia stima e quella del file che ho trovato?

In [None]:
fig, ax = pl.subplots(nrows=1, ncols=1, figsize=(14, 8))


for ret_type in return_type:
    est1 = hill_estimation[ret_type]
    x1 = est1['kappas'][truncation:]
    y1 = est1['xis'][truncation:]
    
    est2 = my_hill_estimation[ret_type]
    x2 = est2['kappas'][truncation:]
    y2 = est2['xis'][truncation:]
    
    assert np.all(x1 == x2)

    ax.plot(x1, y1 - y2, color=colors[ret_type], label=legend_labels[ret_type])

ax.set(
    xlabel=r'$k$',
    ylabel=r'Difference between the $\xi = \gamma$ estimations',
)
ax.legend()
sns.despine()

Ok, non c'è differenza tra la stima che ho trovato su GitHub e la mia, bon.

#### 3.1.3 Fitting della GEV

Ora biogna passare a fittare la GEV e calcolare la statistica KS per ogni valore di $k$ (e quindi del return che funge da threshold, $x_t$).

Andremo poi a scegliere il valore di $k$ che minimizza la KS.

Creiamo allora una funzione apposita:

In [None]:
@timeit
@numba.jit(nopython=False, parallel=True, nogil=True)
def fit_gev(lr, xi, k, size=None, n_tries=1):
    """Find the best fitting GEV to the tail distribution of data in lr.
    lr MUST BE in descending order
    """
#     assert isinstance(xi, np.ndarray)
#     assert isinstance(k, np.ndarray)
#     assert xi.shape == k.shape
    
    # check descending
#     assert np.all(np.diff(lr) <= 0.0)
    nk = k.shape[0]
    n_xi = xi.shape[0]
    
    ks = np.zeros((n_xi, ))
    pvals = np.zeros((n_xi, ))
    fits = []
    
    print("Start fitting")
    for i in numba.prange(nk):
        current_k = k[i]
        current_xi = xi[i]
        
        threshold = lr[current_k]
        tail_data = lr[:current_k]
        
        if size:
            ss = size
        else:
            ss = current_k
        
        fit = gev.fit(tail_data, fix_c=-current_xi)  # convenzioni diverse in SciPy
        fits.append(fit)
        
        k_stat_temp = np.zeros((n_tries, ))
        pval_temp = np.zeros((n_tries, ))
        
        c = fit[0]
        loc = fit[1]
        scale = fit[2]
        
        for j in range(n_tries):
            rvs = gev.rvs(c, loc, scale, ss)

            kk, pv = scipy.stats.ks_2samp(tail_data, rvs)
            k_stat_temp[j] = kk
            pval_temp[j] = pv
        
        ks[i] = np.mean(k_stat_temp)
        pvals[i] = np.mean(pval_temp)
        
    print("Fitting done")
    
    return {
        'ks': ks,
        'p': pvals,
        'fits': fits,
    }

ed applichiamola:

In [None]:
load = True
npz_filename = f"ks_stat-pvals_{split_key}.npz"
pickle_filename = f"./fits_{split_key}.pickle"

if load:  # just load the already computed KS and p-values
    loaded = np.load(npz_filename)
    
    with open(pickle_filename, 'rb') as infile:
        fits = pickle.load(infile)
    
    kolmog_smirn = {
        'pos': {
            'ks': loaded['ks_pos'],
            'p': loaded['pvals_pos'],
            'fits': fits['pos'],
        },
        'neg': {
            'ks': loaded['ks_neg'],
            'p': loaded['pvals_neg'],
            'fits': fits['neg'],
        },
        'abs': {
            'ks': loaded['ks_abs'],
            'p': loaded['pvals_abs'],
            'fits': fits['abs'],
        }
    }
    
    
else:  # compute them and save them
    size = 10000
    n_tries = 10
    print("\nFitting returns")

    kolmog_smirn = {
        ret_type: fit_gev(
            sorted_lr[ret_type].values,
            hill_estimation[ret_type]['xis'],
            hill_estimation[ret_type]['kappas'],
            size=size,
            n_tries=n_tries,
        )
        for ret_type in return_type
    }
    
    np.savez_compressed(npz_filename, **{
        'ks_pos': kolmog_smirn['pos']['ks'],
        'pvals_pos': kolmog_smirn['pos']['p'],
        'ks_neg': kolmog_smirn['neg']['ks'],
        'pvals_neg': kolmog_smirn['neg']['p'],
        'ks_abs': kolmog_smirn['abs']['ks'],
        'pvals_abs': kolmog_smirn['abs']['p'],
    })
    
    fits = {
        ret_type: kolmog_smirn[ret_type]['fits']
        for ret_type in return_type
    }
    
    with open(pickle_filename, 'wb') as outfile:
        pickle.dump(fits, outfile)

### 3.2 Secondo flusso

Proviamo invece a fittare tutti e 3 i parametri in un colpo solo

#### 3.2.1 Fitting della GEV

In [None]:
@timeit
@numba.jit(nopython=False, parallel=True, nogil=True)
def fit_gev_one_shot(lr, size=None, n_tries=1):
    """Find the best fitting GEV to the tail distribution of data in lr."""
#     assert isinstance(xi, np.ndarray)
#     assert isinstance(k, np.ndarray)
#     assert xi.shape == k.shape
    
    # check descending
#     assert np.all(np.diff(lr) <= 0.0)
    n = lr.shape[0]
    
    ks_dist = np.zeros((n - 1, ))
    pvals = np.zeros((n - 1, ))
    fits = []
    
    print("Start fitting")
    for k in numba.prange(1, n):
        threshold = lr[k]
        tail_data = lr[:k]
        
        if size:
            ss = size
        else:
            ss = k
        
        fit = gev.fit(tail_data)
        fits.append(fit)
        
        k_stat_temp = np.zeros((n_tries, ))
        pval_temp = np.zeros((n_tries, ))
        
        c = fit[0]
        loc = fit[1]
        scale = fit[2]
        
        for j in range(n_tries):
            rvs = gev.rvs(c, loc, scale, ss)

            kk, pv = scipy.stats.ks_2samp(tail_data, rvs)
            k_stat_temp[j] = kk
            pval_temp[j] = pv
        
        ks_dist[k - 1] = np.mean(k_stat_temp)
        pvals[k - 1] = np.mean(pval_temp)
        
    print("Fitting done")
    
    return {
        'ks': ks_dist,
        'p': pvals,
        'fits': fits,
    }

In [None]:
load = True
npz_filename = f"ks_stat-pvals_{split_key}_one_step.npz"
pickle_filename = f"./fits_{split_key}_one_step.pickle"

if load:  # just load the already computed KS and p-values
    loaded_2 = np.load(npz_filename)
    
    with open(pickle_filename, 'rb') as infile:
        fits_2 = pickle.load(infile)
    
    kolmog_smirn_2 = {
        'pos': {
            'ks': loaded_2['ks_pos'],
            'p': loaded_2['pvals_pos'],
            'fits': fits_2['pos'],
        },
        'neg': {
            'ks': loaded_2['ks_neg'],
            'p': loaded_2['pvals_neg'],
            'fits': fits_2['neg'],
        },
        'abs': {
            'ks': loaded_2['ks_abs'],
            'p': loaded_2['pvals_abs'],
            'fits': fits_2['abs'],
        }
    }
    
else:  # compute them and save them
    size = 10000
    n_tries = 10
    print("\n\nFitting returns")

    kolmog_smirn_2 = {
        ret_type: fit_gev_one_shot(
            sorted_lr[ret_type].values,
            size=size,
            n_tries=n_tries,
        )
        for ret_type in return_type
    }
    
    np.savez_compressed(npz_filename, **{
        'ks_pos': kolmog_smirn_2['pos']['ks'],
        'pvals_pos': kolmog_smirn_2['pos']['p'],
        'ks_neg': kolmog_smirn_2['neg']['ks'],
        'pvals_neg': kolmog_smirn_2['neg']['p'],
        'ks_abs': kolmog_smirn_2['abs']['ks'],
        'pvals_abs': kolmog_smirn_2['abs']['p'],
    })
    
    fits_2 = {
        ret_type: kolmog_smirn_2[ret_type]['fits']
        for ret_type in return_type
    }
    
    with open(pickle_filename, 'wb') as outfile:
        pickle.dump(fits_2, outfile)

### 3.3 Confronto tra i due flussi

Ora che ho la statistica KS per ogni valore di k e del return, plot delle statistiche rispetto ai returns e a $k$.

In [None]:
min_pval = 0.05

valid_pvals = {
    ret_type: kolmog_smirn[ret_type]['p'] <= min_pval
    for ret_type in return_type
}

valid_pvals_2 = {
    ret_type: kolmog_smirn_2[ret_type]['p'] <= min_pval
    for ret_type in return_type
}

# trovo gli indici minimi
i_min_ks = dict()
i_min_ks_2 = dict()

for ret_type in return_type:
    # indici booleani di validità del p-value
    mask = valid_pvals[ret_type]
    mask_2 = valid_pvals_2[ret_type]
    
    # copio la KS per quel return
    y = copy.deepcopy(kolmog_smirn[ret_type]['ks'])
    y_2 = copy.deepcopy(kolmog_smirn_2[ret_type]['ks'])
    
    # dove il p-value > 0.05, setto la KS al massimo così non viene considerata
    y[np.logical_not(mask)] = np.max(y)
    y_2[np.logical_not(mask_2)] = np.max(y_2)
    
    # trovo gli indici di minima distanza KS per questo return
    i_min_ks[ret_type] = np.argmin(y)
    i_min_ks_2[ret_type] = np.argmin(y_2)

Le figure seguenti mostrano l'andamento di $d_{KS}$ in funzione di k e dei returns ordinati, su scala $x$ semilogaritmica, per entrambi i modi di calcolare il fitting della GEV

In [None]:
# plot della statistica KS al variare del sorted return e di k
truncation = 100
end = -170
p_labels = {
    'pos': r'valid $p$ for $r$',
    'neg': r'valid $p$ for $-r$',
    'abs': r'valid $p$ for $|r|$',
}

p_height = {
    'pos': 0.0,
    'neg': -0.01,
    'abs': -0.02
}

fig, ax = pl.subplots(nrows=2, ncols=2, figsize=(22, 14))

# prima colonna: d_KS in funzione dei returns
for ret_type in return_type:
    # prendo le x complete e le tronco per il plot
    x = sorted_lr[ret_type].values[1:]
    x_trunc = x#[truncation:end]
    
    # prendo le x con i p-value validi nei due casi
    mask = valid_pvals[ret_type]#[truncation:end]
    x_ok = x_trunc[mask]
    
    mask_2 = valid_pvals_2[ret_type]#[truncation:end]
    x_ok_2 = x_trunc[mask_2]
    
    # prendo le y complete e le tronco per il plot nei due casi
    y = kolmog_smirn[ret_type]['ks']
    y_trunc = y#[truncation:end]
    
    y_2 = kolmog_smirn_2[ret_type]['ks']
    y_trunc_2 = y_2#[truncation:end]

    ### primo flusso: GEV in due steps
    ax[0, 0].semilogx(
        x_trunc,
        y_trunc,
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.5,
        label=legend_labels[ret_type]
    )
    
    ax[0, 0].semilogx(
        x_ok,
        p_height[ret_type] * np.ones((len(x_ok, ))),
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.7,
#         label=p_labels[ret_type]
    )
    
    ax[0, 0].axvline(
        x[i_min_ks[ret_type]],
        linestyle='-.',
        color=colors[ret_type],
        alpha=0.7
    )
    
    ### secondo flusso: GEV in un colpo solo
    ax[1, 0].semilogx(
        x_trunc,
        y_trunc_2,
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.5,
        label=legend_labels[ret_type])
    
    ax[1, 0].semilogx(
        x_ok_2,
        p_height[ret_type] * np.ones((len(x_ok_2, ))),
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.7,
#         label=p_labels[ret_type]
    )
    
    ax[1, 0].axvline(
        x[i_min_ks_2[ret_type]],
        linestyle='-.',
        color=colors[ret_type],
        alpha=0.7
    )

for i in range(2):
    ax[i, 0].set(
        xlabel=r'$r$, $-r$, $|r|$',
        ylabel=r'$d_{KS}$',
        title=r'$d_{KS} ( r )$, method ' + str(i)
    )
    ax[i, 0].legend()

    
########################################################################
# seconda colonna: d_KS in funzione di k
for ret_type in return_type:
    # prendo le x complete e le tronco per il plot
    x = hill_estimation[ret_type]['kappas']
    x_trunc = x#[truncation:end]
    
   # prendo le x con i p-value validi nei due casi
    mask = valid_pvals[ret_type]#[truncation:end]
    x_ok = x_trunc[mask]
    
    mask_2 = valid_pvals_2[ret_type]#[truncation:end]
    x_ok_2 = x_trunc[mask_2]
    
    # prendo le y complete e le tronco per il plot nei due casi
    y = kolmog_smirn[ret_type]['ks']
    y_trunc = y#[truncation:end]
    
    y_2 = kolmog_smirn_2[ret_type]['ks']
    y_trunc_2 = y_2#[truncation:end]

    ### primo flusso: GEV in due steps
    ax[0, 1].semilogx(
        x_trunc,
        y_trunc,
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.5,
        label=legend_labels[ret_type]
    )
    
    ax[0, 1].semilogx(
        x_ok,
        p_height[ret_type] * np.ones((len(x_ok, ))),
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.7,
        label=p_labels[ret_type]
    )
    
    ax[0, 1].axvline(
        x[i_min_ks[ret_type]],
        linestyle='-.',
        color=colors[ret_type],
        alpha=0.7
    )

    ### secondo flusso: GEV in un colpo solo
    ax[1, 1].semilogx(
        x_trunc,
        y_trunc_2,
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.5,
        label=legend_labels[ret_type]
    )
    
    ax[1, 1].semilogx(
        x_ok_2,
        p_height[ret_type] * np.ones((len(x_ok_2, ))),
        color=colors[ret_type],
        linestyle='',
        marker='.',
        markersize=0.7,
#         label=p_labels[ret_type]
    )
    
    ax[1, 1].axvline(
        x[i_min_ks_2[ret_type]],
        linestyle='-.',
        color=colors[ret_type],
        alpha=0.7
    )

for i in range(2):
    ax[i, 1].set(
        xlabel=r'$k$',
        ylabel=r'$d_{KS}$',
        title=r'$d_{KS} ( k )$, method ' + str(i)
    )
    ax[i, 1].legend()


sns.despine()

# ax[0, 0].get_shared_x_axes().join(ax[0, 0], ax[1, 0])
# ax[0, 1].get_shared_x_axes().join(ax[0, 1], ax[1, 1])
# ax[0, 1].set_ylim([-0.03, 0.125])
# ax[0, 0].set_ylim([-0.03, 0.125])

Le linee sottostanti le figure, composte in realtà da punti, identificano quei valori per cui il test di Kolmogorov-Smirnov ha dato un _p_-value $p \leq 0.05$, ed è quindi ritenuto statisticamente valido. Si nota come agli estremi di $k$ e dei returns il *p*-value non sia significativo e ci siano grosse oscillazioni, probabilmente dovute a instabilità numeriche nel calcolo della maximum likelihood.

### 3.4 Minima $d_{KS}$ per trovare il miglior fit della GEV

Ottimo, ora bisogna selezionare il minimo valore della statistica KS $d_{KS}$ che abbia un p-value valido ($p < 0.05$).

In [None]:
def find_min_dks(d_ks, pvals, min_pval=min_pval):
    dks = d_ks.copy()
    invalid = pvals > min_pval
    
    dks[invalid] = np.min(dks) + 1
    i_min = np.argmin(dks)
    
    return dks[i_min], i_min

In [None]:
min_kolmog_smirn = {
    ret_type: find_min_dks(kolmog_smirn[ret_type]['ks'], kolmog_smirn[ret_type]['p'])
    for ret_type in return_type
}

threshold_evt = {
    'pos': sorted_lr['pos'][min_kolmog_smirn['pos'][1]],
    'neg': sorted_lr['neg'][min_kolmog_smirn['neg'][1]],
    'abs': sorted_lr['abs'][min_kolmog_smirn['abs'][1]],
}

min_kolmog_smirn_2 = {
    ret_type: find_min_dks(kolmog_smirn_2[ret_type]['ks'], kolmog_smirn_2[ret_type]['p'])
    for ret_type in return_type
}

threshold_evt_2 = {
    'pos': sorted_lr['pos'][min_kolmog_smirn_2['pos'][1]],
    'neg': sorted_lr['neg'][min_kolmog_smirn_2['neg'][1]],
    'abs': sorted_lr['abs'][min_kolmog_smirn_2['abs'][1]],
}

print("Primo flusso: stima di gamma separata")
title_format = "{:>15}"*4
row_format = "{:>15}{:>15.4f}{:>15}{:>15.6f}"
print(title_format.format('Return type', 'Min d_KS', 'i', 'Return'))
print("-"*60)
for ret_type in return_type:
    print(row_format.format(
        ret_type,
        min_kolmog_smirn[ret_type][0],
        min_kolmog_smirn[ret_type][1],
        threshold_evt[ret_type]
    ))
    
print("\n\nSecondo flusso: stima con MLE di tutti i parametri")
title_format = "{:>15}"*4
row_format = "{:>15}{:>15.4f}{:>15}{:>15.6f}"
print(title_format.format('Return type', 'Min d_KS', 'i', 'Return'))
print("-"*60)
for ret_type in return_type:
    print(row_format.format(
        ret_type,
        min_kolmog_smirn_2[ret_type][0],
        min_kolmog_smirn_2[ret_type][1],
        threshold_evt_2[ret_type]
    ))

Sembrerebbe che la stima con MLE in un colpo solo sia **molto peggiore** di quella in due steps, vediamo graficamente se le distribuzioni fittano bene i dati allora

In [None]:
# preparo i dati per plottare
titles = {
    'pos': 'Positive extreme returns',
    'neg': 'Negative extreme returns',
    'abs': 'Absolute extreme returns',
}

xlabels = {
    'pos': r'Positive extreme returns $r$',
    'neg': r'Negative extreme returns $-r$',
    'abs': r'Absolute extreme returns  $|r|$',
}

labels = {
    'pos': r'$r$',
    'neg': r'$-r$',
    'abs': r'$|r|$',
}

# plot
fig, ax = pl.subplots(nrows=2, ncols=len(return_type), figsize=(18, 10), sharex=False, sharey=True)

# sulle righe i due flussi
for i in range(ax.shape[0]):
    
    # sulle colonne i 3 tipi di returns e le loro GEV fittate
    for j, ret_type in enumerate(return_type):
        if i == 0: # primo flusso
            i_min = min_kolmog_smirn[ret_type][1]
            
            data = sorted_lr[ret_type].values[:i_min]
            best_fit = fits[ret_type][i_min]
            
            title = 'Separate GEV fitting'
        elif i == 1: # secondo flusso
            i_min = min_kolmog_smirn_2[ret_type][1]
            
            data = sorted_lr[ret_type].values[:i_min]
            best_fit = fits_2[ret_type][i_min]
            
            title = 'One shot GEV fitting'
        
        print(f"method: {i}, ret_type: {ret_type}, n_extremes: {data.shape[0]}")
        
        sns.distplot(
            data,
            color=colors[ret_type],
            label=legend_labels[ret_type],
            kde=False,
            norm_hist=True,
            ax=ax[i, j]
        )
        
        _, b = ax[i, j].xaxis.get_data_interval()
        x = np.linspace(0, b, 1000)
        pdf = gev.pdf(x, *best_fit)
        ax[i, j].plot(x, pdf, color=colors[ret_type], label='GEV pdf')
        
        ax[i, j].set_title(title)
        ax[i, j].set_xlabel(xlabels[ret_type])
        ax[i, j].legend()

sns.despine()

In [None]:
i_min = min_kolmog_smirn['neg'][1]
i_min_2 = min_kolmog_smirn_2['neg'][1]

data = sorted_lr['neg'].values[:i_min]
data_2 = sorted_lr['neg'].values[:i_min_2]

fig, ax = pl.subplots(nrows=1, ncols=1, figsize=(14, 7))

ax.plot(data_2, color='steelblue', label='One-step GEV')
ax.plot(data, color='indianred', label='Two-steps GEV')

ax.fill_between(np.arange(len(data_2)), 0, data_2, color='steelblue', alpha=0.5)
ax.fill_between(np.arange(len(data)), 0, data, color='indianred', alpha=0.5)

ax.set_xlabel(r"$k$", fontsize=16)
ax.set_ylabel(r"$-r$", fontsize=16)
ax.legend()

sns.despine()

In effetti sembra proprio che separare in due steps la ricerca del tail exponent e poi degli altri parametri sia benefico, visto che riduce il numero di estremi.
Se faccio il fit in un colpo solo infatti, viene un threshold ancora peggiore, quindi meglio non deviare dal seminato del paper.

Finora abbiamo quindi ottenuto:

- le distribuzioni di probabilità degli extreme returns (positivi, negativi, assoluti)
- i threshold che massimizzano il fitting della distribuzione *GEV* sugli extreme returns. Tali threshold possono essere quindi usati per determinare quali movimenti siano estremi e quali no

Concludiamo quindi confrontando i threshold così ottenuti con i threshold del 95% percentile che abbiamo utilizzato finora per le azioni S&P500, e con quello che si otterrebbe ad utilizzare il $k^*$ calcolato con lo stimatore di Hill.

In [None]:
# calcolo dei threshold
# quantili: attenzione che sono TUTTI POSITIVI
thresholds = {
    ret_type: {
        q_type: returns[ret_type].abs().quantile(float(q_type) / 100)
        for q_type in quantile_type[:-1]
    }
    for ret_type in return_type
}

for ret_type in return_type:
    thresholds[ret_type]['evt'] = abs(threshold_evt[ret_type])

extremes = {
    ret_type: {
        q_type: (returns[ret_type].abs() >= thresholds[ret_type][q_type]).astype(np.int8)
        for q_type in quantile_type
    }
    for ret_type in return_type
}

extremes_test = {
    ret_type: {
        q_type: (returns_test[ret_type].abs() >= thresholds[ret_type][q_type]).astype(np.int8)
        for q_type in quantile_type
    }
    for ret_type in return_type
}

In [None]:
# calcolo i threshold di Hill solamente
threshold_hill = {
    'neg': sorted_lr['neg'].values[hill_estimation['neg']['k_opt']],
    'pos': sorted_lr['pos'].values[hill_estimation['pos']['k_opt']]   
}

# plot dei threshold
fig, ax = pl.subplots(nrows=1, ncols=1, figsize=(18, 9))

dates = log_returns.index
# training set
ax.plot(
    returns_before_after[split_key][0]['Adj Close'],
    label='Log Returns - train',
    color='slategrey',
    alpha=0.25,
    linestyle='',
    marker='.'
)
# testing set
ax.plot(
    returns_before_after[split_key][1]['Adj Close'],
    label='Log Returns - test',
    color='navy',
    alpha=0.25,
    linestyle='',
    marker='.'
)

# percentili
ax.plot(dates, -thresholds['neg']['95'] * np.ones(len(dates)), color=colors['neg'], linestyle=':', label=r'95% percentile, $-r$')
ax.plot(dates, thresholds['pos']['95'] * np.ones(len(dates)), color=colors['pos'], linestyle=':', label=r'95% percentile, $r$')

# EVT, flusso 1
ax.plot(dates, -thresholds['neg']['evt'] * np.ones(len(dates)), color=colors['neg'], linestyle='--', label=r'$-x_t$, EVT')
ax.plot(dates, thresholds['pos']['evt'] * np.ones(len(dates)), color=colors['pos'], linestyle='--', label=r'$x_t$, EVT')

# Hill
ax.plot(dates, -threshold_hill['neg'] * np.ones(len(dates)), color=colors['neg'], linestyle='-.', label=r'$-x_t$, Hill')
ax.plot(dates, threshold_hill['pos'] * np.ones(len(dates)), color=colors['pos'], linestyle='-.', label=r'$x_t$, Hill')

ax.set(xlabel='Date', ylabel='Log Returns', title='Dow Jones Log Returns and thresholds', ylim=[-0.12, 0.12])
ax.legend(loc='lower right')

sns.despine()

Come si può vedere dal plot, i percentili sono molto più stringenti rispetto al valore che massimizza il fitting della *GEV*, mentre i threshold calcolati con il solo stimatore di Hill sono più stringenti dei percentili.

Vediamo anche di confermarlo con un po' di numeri:

In [None]:
def get_percent(data, thresh_low, thresh_up):
    indexes = np.logical_or(data.values <= thresh_low, data.values >= thresh_up)
    num = len(data[indexes])
    denom = len(data)
    
    return num / denom

perc = log_returns['Adj Close'].quantile(q=[0.05, 0.95])

extreme_percent_with_percentiles = get_percent(log_returns['Adj Close'], perc[0.05], perc[0.95])
extreme_percent_with_evt = get_percent(log_returns['Adj Close'], -thresholds['neg']['evt'], thresholds['pos']['evt'])
extreme_percent_with_evt_2 = get_percent(log_returns['Adj Close'], -threshold_evt_2['neg'], threshold_evt_2['pos'])

print("{:>20}{:>15}".format('Threshold type', 'Extremes %'))
print("-"*35)
print("{:>20}{:>15.3f}".format('percentile 5-95 %', extreme_percent_with_percentiles))
print("{:>20}{:>15.3f}".format('EVT', extreme_percent_with_evt))
print("{:>20}{:>15.3f}".format('EVT2', extreme_percent_with_evt_2))

In pratica, vuol dire che se usassimo i valori di threshold ricavati dalla *EVT* avremmo un dataset sicuramente più bilanciato, ma c'è da chiedersi se si possano effettivamente allora considerare "estremi". Non è troppo "inclusivo" un tale threshold?

### 3.5 Calcolo dei $\tau_Q$ e delle $Q$

Calcoliamoli per poi usarli nella maximum likelihood estimation.

In [None]:
# creo i tau e calcolo i Q
tau_q_95 = 1.0 / (1.0 - 0.95)
tau_q_975 = 1.0 / (1.0 - 0.975)
tau_q_99 = 1.0 / (1.0 - 0.99)

# calcolo i quantili equivalenti ai threshold della EVT
Q = {
    ret_type: 1.0 - (sum(returns[ret_type].abs() >= abs(threshold_evt[ret_type])) / len(returns[ret_type]))
    for ret_type in return_type
}

tau_q = {
    ret_type: {
        '95': tau_q_95,
        '97.5': tau_q_975,
        '99': tau_q_99,
        'evt': 1.0 / (1.0 - Q[ret_type])
    }
    for ret_type in return_type
}

## 4. Calcolo degli intervalli di ricorrenza e plot della loro distribuzione

Ora che abbiamo i threshold possiamo calcolare gli intervalli di ricorrenza e vederne la distribuzione.

In [None]:
def get_recurrence_intervals(is_extreme: pd.DataFrame):
    """Get the recurrence intervals durations between extremes.
    
    Parameters
    ----------
    is_extreme: pd.DataFrame
        a DataFrame with the date on the index and 1 if the return at time t is extreme,
        0 otherwise. Must contain a single column named 'extreme'
    """
    assert isinstance(is_extreme.index, pd.DatetimeIndex)
    assert len(is_extreme.columns) == 1
    
    # convert to int
    data = is_extreme.astype(np.int8)
    data.loc[:, 'date'] = data.index
    data.index = pd.RangeIndex(len(is_extreme))
    
    data_is_extreme = data[data[data.columns[0]] == 1]
    
    intervals = []
    for i in range(1, len(data_is_extreme)):
        last_time = data_is_extreme.date.iloc[i - 1]
        current_time = data_is_extreme.date.iloc[i]
        
        n_days = data_is_extreme.index[i] - data_is_extreme.index[i - 1]
        
        intervals.append((last_time, current_time, n_days))
        
    return intervals

I quantili vanno presi al 95%, 97.5%, 99%. Creo quindi il `dict` che contiene gli intervalli di ricorrenza, organizzati secondo il tipo di return ed il tipo di threshold (quantile o evt):

In [None]:
# calcolo intervalli di ricorrenza - training set
tmp_rec_int = {
    ret_type: {
        q_type: get_recurrence_intervals(pd.DataFrame(extremes[ret_type][q_type]))
        for q_type in quantile_type
    }
    for ret_type in return_type
}

recurrence_intervals = {
    ret_type: {
        q_type: pd.DataFrame(data={
            'last_extreme': [x[0] for x in tmp_rec_int[ret_type][q_type]],
            'current_extreme': [x[1] for x in tmp_rec_int[ret_type][q_type]],
            'n_days': [x[2] for x in tmp_rec_int[ret_type][q_type]],
        })
        for q_type in quantile_type
    }
    for ret_type in return_type
}

# calcolo intervalli di ricorrenza - testing set
tmp_rec_int_test = {
    ret_type: {
        q_type: get_recurrence_intervals(pd.DataFrame(extremes_test[ret_type][q_type]))
        for q_type in quantile_type
    }
    for ret_type in return_type
}

recurrence_intervals_test = {
    ret_type: {
        q_type: pd.DataFrame(data={
            'last_extreme': [x[0] for x in tmp_rec_int_test[ret_type][q_type]],
            'current_extreme': [x[1] for x in tmp_rec_int_test[ret_type][q_type]],
            'n_days': [x[2] for x in tmp_rec_int_test[ret_type][q_type]],
        })
        for q_type in quantile_type
    }
    for ret_type in return_type
}

### 4.1 Plot istogrammi intervalli di ricorrenza

Ora visualizzo graficamente la lunghezza degli intervalli di ricorrenza con degli istogrammi, rispettivamente per i returns positivi, negativi ed assoluti.

In [None]:
hist_labels = {
    '95': '95%',
    '97.5': '97.5%',
    '99': '99%',
    'evt': 'EVT',
}

titles = {
    'pos': r'Positive $r$',
    'neg': r'Negative $-r$',
    'abs': r'Absolute $|r|$',
}

y_lims = {
    'pos': [0.0, 0.15],
    'neg': [0.0, 0.15],
    'abs': [0.0, 0.08],
}

fig, ax = pl.subplots(nrows=2, ncols=3, figsize=(18, 12))

# riga 1: training set
for i, ret_type in enumerate(return_type):
    for q_type in quantile_type:
        curr = recurrence_intervals[ret_type][q_type].n_days
        sns.distplot(curr, kde=False, norm_hist=True, label=hist_labels[q_type], ax=ax[0, i])

    ax[0, i].legend()
    ax[0, i].set(title=titles[ret_type] + " | training set", ylim=y_lims[ret_type])

# riga 2: testing set
for i, ret_type in enumerate(return_type):
    for q_type in quantile_type:
        curr = recurrence_intervals_test[ret_type][q_type].n_days
        sns.distplot(curr, kde=False, norm_hist=True, label=hist_labels[q_type], ax=ax[1, i])

    ax[1, i].legend()
    ax[1, i].set(title=titles[ret_type] + " | testing set", ylim=y_lims[ret_type])

sns.despine()

### 4.2 Creazione delle tabelle come nel paper

Ora creo le tabelle riassuntive come a pagina 9 del paper di [Jiang et al](https://doi.org/10.1080/14697688.2017.1373843).

Prima mi creo due funzioncine e poi le chiamo.

In [None]:
def get_single_table(intervals: pd.DataFrame, returns: pd.DataFrame, ret_type: str, thresh: float, col_name='perc'):
    """Get a single panel sub-table."""
    obsv = int(intervals.shape[0])
    mean = intervals['n_days'].mean()
    median = intervals['n_days'].median()
    std_dev = intervals['n_days'].std()
    skewness = intervals['n_days'].skew()
    kurtosis = intervals['n_days'].kurt()
    
    ret_mean = returns.mean()
    ret_std_dev = returns.std()
    
    m = (thresh - ret_mean) / ret_std_dev
#     print(f"\nRet_type: {ret_type}, q_type: {col_name}")
#     print(f"Threshold: {thresh:5.4f}, Mean: {ret_mean:5.4f}, m: {m:5.4f}")
        
    acf, qstat, pvals = stattools.acf(intervals['n_days'].values, qstat=True, nlags=30)
    rho1 = acf[1]
    _, p_rho1 = scipy.stats.pearsonr(
        intervals['n_days'].values[1:],
        intervals['n_days'].shift(periods=1).values[1:],
    )
    
    rho5 = acf[5]
    _, p_rho5 = scipy.stats.pearsonr(
        intervals['n_days'].values[5:],
        intervals['n_days'].shift(periods=5).values[5:],
    )

    Q30 = qstat[-1]
    p_Q30 = pvals[-1]
    
    index = pd.Index(data=[
        'm',
        'obsv',
        'mean',
        'median',
        'stdev',
        'skew',
        'kurt',
        'rho(1)',
        'p-value(rho1)',
        'rho(5)',
        'p-value(rho5)',
        'Q(30)',
        'p-value(Q30)',
    ])
    
    result = pd.DataFrame(data=[
        [m],
        [obsv],
        [mean],
        [median],
        [std_dev],
        [skewness],
        [kurtosis],
        [rho1],
        [p_rho1],
        [rho5],
        [p_rho5],
        [Q30],
        [p_Q30],
    ],
    index=index,
    columns=[col_name])
    
    return result

Il titolo è la sezione della tabella (Negative/Positive/Absolute), i quantili sono quelli che mi interessano e finiranno sulle colonne della tabella ed il risultato è un `dict` che ha come chiavi i titoli.

In [None]:
tables = {
    ret_type: {
        q_type: get_single_table(recurrence_intervals[ret_type][q_type],
                                 returns[ret_type],
                                 ret_type,
                                 thresh=thresholds[ret_type][q_type],
                                 col_name=q_type)
        for q_type in quantile_type
    }
    for ret_type in return_type
}

panels = {
    ret_type: pd.concat([tables[ret_type][q_type] for q_type in quantile_type], axis='columns')
    for ret_type in return_type
}

Visualizziamo le tabelle:

In [None]:
panels['pos']

In [None]:
panels['neg']

In [None]:
panels['abs']

### 4.3 Plot degli autocorrelogrammi

Vediamo con gli [autocorrelogammi](https://en.wikipedia.org/wiki/Correlogram) se c'è autocorrelazione nelle serie dei *recurrence interval*.

In [None]:
# sulle righe il tipo di threshold, sulle colonne il tipo di return
fig, ax = pl.subplots(nrows=3, ncols=3, figsize=(27, 18))
fig.suptitle("Autocorrelation plots", fontsize=16)

tsaplots.plot_acf(recurrence_intervals['neg']['95']['n_days'].values, lags=30, ax=ax[0][0], title=r'$-r$, 95%')
tsaplots.plot_acf(recurrence_intervals['pos']['95']['n_days'].values, lags=30, ax=ax[0][1], title=r'$r$, 95%')
tsaplots.plot_acf(recurrence_intervals['abs']['95']['n_days'].values, lags=30, ax=ax[0][2], title=r'$|r|$, 95%')
ax[0, 0].set_ylabel("Pearson $R$")

tsaplots.plot_acf(recurrence_intervals['neg']['97.5']['n_days'].values, lags=30, ax=ax[1][0], title=r'$-r$, 97.5%')
tsaplots.plot_acf(recurrence_intervals['pos']['97.5']['n_days'].values, lags=30, ax=ax[1][1], title=r'$r$, 97.5%')
tsaplots.plot_acf(recurrence_intervals['abs']['97.5']['n_days'].values, lags=30, ax=ax[1][2], title=r'$|r|$, 97.5%')
ax[1, 0].set_ylabel("Pearson $R$")

# WARNING: non ci sono abbastanza dati
# tsaplots.plot_acf(recurrence_intervals['neg']['99']['n_days'].values, lags=30, ax=ax[2][0], title=r'$-r$, 99%')
# tsaplots.plot_acf(recurrence_intervals['pos']['99']['n_days'].values, lags=30, ax=ax[2][1], title=r'$r$, 99%')
# tsaplots.plot_acf(recurrence_intervals['abs']['99']['n_days'].values, lags=30, ax=ax[2][2], title=r'$|r|$, 99%')
# ax[2, 0].set_ylabel("Pearson $R$")

tsaplots.plot_acf(recurrence_intervals['neg']['evt']['n_days'].values, lags=30, ax=ax[2][0], title=r'$-r$, EVT')
tsaplots.plot_acf(recurrence_intervals['pos']['evt']['n_days'].values, lags=30, ax=ax[2][1], title=r'$r$, EVT')
tsaplots.plot_acf(recurrence_intervals['abs']['evt']['n_days'].values, lags=30, ax=ax[2][2], title=r'$|r|$, EVT')
ax[2, 0].set_ylabel("Pearson $R$")

for a in ax[2]:
    a.set_xlabel('lag in the recurrence interval array')

sns.despine()

Per interpretare i plot, bisogna ricordare che:

- sulle $x$ c'è il lag della serie temporale relativa ai giorni tra i movimenti estremi, cioè quella degli intervalli di ricorrenza. Vuol dire che $x=22$ significa il 22° intervallo di ricorrenza visto nel passato, prima di quello attuale, non 22 giorni prima di oggi. La distanza in giorni potrebbe anche essere un anno o più.
- sulle y c'è la correlazione di Pearson $R$

Apparentemente c'è autocorrelazione nei recurrence intervals selezionati con il quantile $q_{0.95}$ fino a 6 per positivi e negativi, 2 per gli assoluti.

Per i recurrence intervals con $q_{0.975}$ solo a 1 giorno per positivi e negativi, nessuna per gli assoluti.

Per i recurrence intervals con $q_{0.99}$ non c'è autocorrelazione.

### 4.4 Verifica relazione empirica

Verifichiamo ora la relazione empirica $\tau_Q = \frac{Q}{1 - Q}$ dove $Q$ è il quantile scelto (0.95, 0.975, 0.99), $\tau_Q$ l'intervallo di ricorrenza medio

In [None]:
title_format = "{:>15}"*5
row_format = "{:>15.3f}{:>15}{:>15.3f}{:>15.3f}" + "{:>14.3f}%"
print(title_format.format('Quantile', 'Return type', 'tau_q', 'True mean', 'Error %'))
print(title_format.format('-'*15, '-'*15, '-'*15, '-'*15, '-'*15))

for q_type in quantile_type[:-1]:
    for i, name in enumerate(return_type):
        data_mean = recurrence_intervals[name][q_type]['n_days'].mean()
        
        q = float(q_type) / 100.0
        tau = 1.0 / (1.0 - q)
        
        perc_diff = (tau - data_mean) / data_mean
        
        print(row_format.format(q, name, tau, data_mean, perc_diff * 100))
        
        if i == len(return_type) - 1:
            print("")

In effetti, la relazione è valida con un margine di errore massimo di circa il $5\%$.

# 5. Determinazione della *Hazard Probability*

Gli autori definiscono la *hazard probability* come

\begin{equation}
    W(\Delta t | t) = \frac{\int_t^{t + \Delta t} p(\tau)d\tau}{\int_t^{\infty}p(\tau)d\tau}
\end{equation}

dove $p(\tau)$ è la distribuzione di probabilità (`pdf` per scipy).

La hazard probability definisce la probabilità che, dato che si è verificato un evento estremo $t$ giorni nel passato, ci sia un tempo di attesa $\Delta t$ ulteriore prima di un altro evento estremo.
Se consideriamo $W(1 | t)$ è simile al problema che abbiamo affrontato con la rete neurale.

Ora, nota la ditribuzione $p(\tau)$, si può derivare analiticamente l'integrale. Il problema è quindi: come trovare $p(\tau)$, e che forma ha?

Gli autori utilizzano una [stretched exponential distribution](https://en.wikipedia.org/wiki/Stretched_exponential_function), una [*q*-exponential distribution](https://en.wikipedia.org/wiki/Q-exponential_distribution) ed una [Weibull distribution](https://it.wikipedia.org/wiki/Distribuzione_di_Weibull). I parametri delle 3 distribuzioni vengono stimati tramite MLE.

Il flusso è il seguente:

1. scegli una distribuzione (s-exp, q-exp, Weibull)
2. riformula la parametrizzazione in funzione solo  dello *shape parameter*
3. calcola la log-likelihood utilizzando una semplice ricerca a griglia sui parametri liberi
4. i parametri che forniscono la massima log-likelihood sono quelli cercati, e trova la formula teorica della *hazard probability* con le equazioni del paper

Cominciamo con la Weibull, ma prima creiamo una funzione che calcoli la *hazard probability* empirica, con la formula

$$
W_{emp}(\Delta t | t) = \frac{\#(t < \tau \leq t + \Delta t)}{\#(\tau > t)}
$$

dove al numeratore c'è il numero di recurrence intervals con valore compreso in $(t, t + \Delta t]$, al denominatore il numero di recurrence intervals con valore maggiore di $t$, cioè nel range $(t, +\infty)$.

In [None]:
def get_empirical_hazard_prob(rec_ints: np.ndarray, t, delta_t):
    """Compute the empirical hazard probability."""
    assert isinstance(rec_ints, np.ndarray)
    num = np.sum(np.logical_and(rec_ints > t, rec_ints <= t + delta_t))
    denom = np.sum(rec_ints > t)
    
    return num / denom

## 5.1 Fitting della Weibull

In `scipy.stats` è definita come

\begin{eqnarray}
&f(x, c) = c x^{c - 1} e^{-x^{c}} \\
&f(x, c, loc, scale) = \frac{1}{scale}f\left(\frac{x - loc}{scale}, c\right)
\end{eqnarray}

dove $c$ è lo *shape parameter*. Nel paper invece è

\begin{equation}
f(x, \beta, \alpha) = \frac{\alpha}{\beta} \left( \frac{\tau}{\beta} \right)^{\alpha - 1} e^{-\left( \frac{\tau}{\beta} \right)^{\alpha}}
\end{equation}

quindi la corrispondenza è

\begin{eqnarray}
&loc = 0 \\
&\beta = scale \\
&shape = c = \alpha
\end{eqnarray}

Ora dobbiamo stimare i parametri della Weibull con una maximum log-likelihood estimation (*MLE*). Riscrivendoli in funzione di $\tau_Q$ e $\beta = scale$ abbiamo

\begin{eqnarray}
&\beta = \frac{\tau_Q}{\Gamma \left( 1 + \frac{1}{\alpha} \right)} \\
cioè \\
&\beta = scale = \frac{\tau_Q}{\Gamma \left( 1 + \frac{1}{c} \right)}
\end{eqnarray}

Ricordiamo che $\tau_Q = \frac{1}{1 - Q}$ dove $Q$ è il quantile.

A questo punto la MLE ha formula:

$$ ln(L_w) = n \cdot ln\left( \frac{c}{\beta} \right) + \sum_{i=1}^{n} \left[ (c - 1) ln\left( \frac{\tau_i}{\beta} \right) - \left( \frac{\tau_i}{\beta} \right)^c \right] $$

dove $n$ è il numero di recurrence intervals, $t_i$ il corrispondente valore dell'intervallo di ricorrenza (es: 14 giorni, 4 giorni...).

Il flow è quindi, in questo caso:

1. a seconda del percentile (95% o EVT) calcolare $Q$ e quindi $\tau_Q$
2. utilizzare una ricerca con step $1e-6$ sul parametro $c = \alpha$, il quale risulta in un certo valore di $\beta$
3. utilizzare quei valori di $c$ e di $\beta$ nella MLE
4. trovare il massimo della MLE ed i corrispondenti valori di $c$ e $\beta$
5. urrà! Ora possiamo usarli nella *pdf* della distribuzione Weibull per ottenere l'hazard $W(\Delta t | t)$

In [None]:
@timeit
@numba.jit(nopython=True, parallel=True, nogil=True)
def mle_weibull(rec_ints: np.ndarray, c: np.ndarray, beta: np.ndarray):
    """MLE estimation for weibull distribution, given an array of c shape parameters and the tau_q,
    with the recurrence intervals rec_ints.
    """       
    m = beta.shape[0]
    n = rec_ints.shape[0]

    # log-likelihood    
    log_likelihoods = np.zeros_like(beta)
    
    # precompute matrices for tau_beta and ln_tau_beta
    tau_beta = np.zeros((n, m), dtype=np.float64)
    for i in range(n):
        for j in range(m):
            tau_beta[i, j] = rec_ints[i] / beta[j]
            
    ln_tau_beta = np.log(tau_beta)
    
    c_beta = c / beta
    n_ln_c_beta = n * np.log(c_beta)
    c_1 = c - 1.0

    for j in numba.prange(m):  # no progress indication, it's a parallel for loop
        summ = 0
        
        for i in range(n):
            summ += c_1[j] * ln_tau_beta[i, j] - tau_beta[i, j] ** c[j]
            
        log_likelihoods[j] = n_ln_c_beta[j] + summ
        
    return log_likelihoods

Ora usiamo la funzione per calcolarci il fitting della Weibull per i returns positivi, negativi ed assoluti:

In [None]:
c = np.arange(0.25, 2, 1e-3)
sfg = sfun.gamma(1.0 + (1.0 / c))

beta = {
    ret_type: {
        q_type: tau_q[ret_type][q_type] / sfg
        for q_type in quantile_type
    }
    for ret_type in return_type
}

i_ok = {
    ret_type: {
        q_type: np.argwhere(beta[ret_type][q_type] > 1e-6).flatten()
        for q_type in quantile_type
    }
    for ret_type in return_type
}

beta_ok = {
    ret_type: {
        q_type: beta[ret_type][q_type][i_ok[ret_type][q_type]]
        for q_type in quantile_type
    }
    for ret_type in return_type
}

c_ok = {
    ret_type: {
        q_type: c[i_ok[ret_type][q_type]]
        for q_type in quantile_type
    }
    for ret_type in return_type
}

In [None]:
load_log_like = True

ll_weib_file = f"./log-like-weib_{split_key}.pickle"

if load_log_like:
    with open(ll_weib_file, 'rb') as infile:
        log_like_weib = pickle.load(infile)
else:
    log_like_weib = dict()

    for ret_type in return_type:
        log_like_weib[ret_type] = dict()
        print(f"\nReturn type: {ret_type}")

        for q_type in quantile_type:
            x = recurrence_intervals[ret_type][q_type]['n_days'].values
            print(f"Computing Weibull MLE on quantile: {q_type}, c={c_ok[ret_type][q_type].shape}, beta={beta_ok[ret_type][q_type].shape}")

            ll = mle_weibull(x, c_ok[ret_type][q_type], beta_ok[ret_type][q_type])

            log_like_weib[ret_type][q_type] = ll
          
    with open(ll_weib_file, 'wb') as outfile:
          pickle.dump(log_like_weib, outfile)

In [None]:
colors_mle = {
    'evt': 'lightskyblue',
    '95': 'palegreen',
    '97.5': 'limegreen',
    '99': 'darkgreen',
}

legend_labels_mle = {
    '95': '95%',
    '97.5': '97.5%',
    '99': '99%',
    'evt': 'EVT',
}

titles = {
    'pos': r'Positive $log(r)$',
    'neg': r'Negative $log(r)$',
    'abs': r'Absolute $log(r)$',
}

fig, ax = pl.subplots(nrows=3, ncols=1, figsize=(14, 15))

# positive log-returns
for i, ret_type in enumerate(return_type):
    for q_type in quantile_type:
        ax[i].plot(
            c_ok[ret_type][q_type],
            log_like_weib[ret_type][q_type],
            color=colors_mle[q_type],
            label=legend_labels_mle[q_type])

        i_max = np.argmax(log_like_weib[ret_type][q_type])
        
        ax[i].plot(
            c_ok[ret_type][q_type][i_max],
            log_like_weib[ret_type][q_type][i_max],
            marker='o',
            color=colors_mle[q_type]
        )
    
    ax[i].set(title=titles[ret_type])
    
for a in ax:
    a.set(xlabel=r'$c = \alpha$', ylabel=r'$log(L_W)$')
    a.legend(loc='lower right')

sns.despine()

Ok, ora che abbiamo le MLE per i tre tipi di returns e i minimi, possiamo fittare la Weibull sui recurrence intervals:

In [None]:
i_min = {
    ret_type: {
        q_type: np.argmax(log_like_weib[ret_type][q_type])
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_shape = {
    'weibull': {
        ret_type: {
            q_type: c_ok[ret_type][q_type][i_min[ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
}

best_scale = {
    'weibull': {
        ret_type: {
            q_type: tau_q[ret_type][q_type] / sfun.gamma(1.0 + 1.0 / best_shape['weibull'][ret_type][q_type])
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
}

best_params = {
    'weibull': {
        ret_type: {
            q_type: {
                'shape': best_shape['weibull'][ret_type][q_type],
                'scale': best_scale['weibull'][ret_type][q_type],
                'loc': 0.0,
            }
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
}

## 5.2 Fitting della stretched-exponential (s-exp)

Creiamo ora la classe per la s-exp, che ha *pdf*:

$$
p(x, c, a, b) = a e^{-\left( bx \right)^c}
$$

dove $c$, $a$ e $b$ sono *shape parameters*, con $0 < c < 1$, $b \geq 0$ e $a > 0$.

Creo allora la funzione che minimizza la log-likelihood della s-exp, che è

$$
ln(L_{s-exp}) = n \cdot \ln(a) - \sum_{i=1}^{n} (b \cdot x_i)^c
$$

dove $n$ è il numero di recurrence intervals, $a = \frac{c \Gamma \left( \frac{2}{c} \right)}{\left[ \Gamma \left( \frac{1}{c} \right) \right]^2 \tau_Q}$ e $b = \frac{ \Gamma \left( \frac{2}{c} \right)}{\Gamma \left( \frac{1}{c} \right) \tau_Q}$.

In [None]:
def get_a_b_sexp(c, tau_q):
    """Get the a and b params."""
    gamma_2_c = sfun.gamma(2.0 / c)
    gamma_1_c = sfun.gamma(1.0 / c)
    
    b_all = gamma_2_c / (gamma_1_c * tau_q)
    a_all = b_all * c / gamma_1_c
    
    return a_all, b_all

@timeit
def mle_sexp(rec_ints: np.ndarray, c: np.ndarray, tau_q: float):
    """MLE estimation for s-exponential distribution, given an array of c shape parameters and the tau_q,
    with the recurrence intervals rec_ints.
    """
    n = rec_ints.shape[0]
    
    a_all, b_all = get_a_b_sexp(c, tau_q)
    
    ln_a_all = np.log(a_all)
    
    ll = np.zeros((c.shape[0], ), dtype=np.float64)
    
    for j, c in enumerate(c):
        ssum = 0
        a = a_all[j]
        b = b_all[j]
        
        for i in range(n):
            ssum += np.power((b * rec_ints[i]), c)
            
        ll[j] = n * ln_a_all[j] - ssum
    
    ll[np.isnan(ll)] = -np.inf
        
    return ll

Ora usiamo la funzione per calcolarci il fitting della s-exp per i returns positivi, negativi ed assoluti:

In [None]:
c_sexp = np.arange(1e-3, 1.0, 1e-3)

In [None]:
load_log_like = True

ll_sexp_file = f"./log-like-sexp_{split_key}.pickle"

if load_log_like:
    with open(ll_sexp_file, 'rb') as infile:
        log_like_sexp = pickle.load(infile)
else:
    log_like_sexp = dict()

    for ret_type in return_type:
        log_like_sexp[ret_type] = dict()
        print(f"\nReturn type: {ret_type}")

        for q_type in quantile_type:
            x = recurrence_intervals[ret_type][q_type]['n_days'].values
            print(f"Computing s-exp MLE on quantile: {q_type}, c={c_sexp.shape}")

            ll = mle_sexp(x, c_sexp, tau_q[ret_type][q_type])

            log_like_sexp[ret_type][q_type] = ll
            
    log_like_sexp['c'] = c_sexp

    with open(ll_sexp_file, 'wb') as outfile:
          pickle.dump(log_like_sexp, outfile)

Ora prendiamo la massima log-likelihoood:

In [None]:
i_max_sexp = {
    ret_type: {
        q_type: np.argmax(log_like_sexp[ret_type][q_type])
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_shape['s-exp'] = {
    ret_type: {
        q_type: c_sexp[i_max_sexp[ret_type][q_type]]
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_a_sexp = {
    ret_type: {
        q_type: get_a_b_sexp(best_shape['s-exp'][ret_type][q_type], tau_q[ret_type][q_type])[0]
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_b_sexp = {
    ret_type: {
        q_type: get_a_b_sexp(best_shape['s-exp'][ret_type][q_type], tau_q[ret_type][q_type])[1]
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_params['s-exp'] = {
    ret_type: {
        q_type: {
            'shape': best_shape['s-exp'][ret_type][q_type],
            'a': best_a_sexp[ret_type][q_type],
            'b': best_b_sexp[ret_type][q_type],
            'loc': 0.0,
        }
        for q_type in quantile_type
    }
    for ret_type in return_type
}

Plottiamo quindi i risultati della MLE:

In [None]:
colors_mle = {
    'evt': 'lightskyblue',
    '95': 'palegreen',
    '97.5': 'limegreen',
    '99': 'darkgreen',
}

legend_labels_mle = {
    '95': '95%',
    '97.5': '97.5%',
    '99': '99%',
    'evt': 'EVT',
}

titles = {
    'pos': r'Positive $log(r)$',
    'neg': r'Negative $log(r)$',
    'abs': r'Absolute $log(r)$',
}

fig, ax = pl.subplots(nrows=3, ncols=1, figsize=(14, 15))

# positive log-returns
truncation = 100
for i, ret_type in enumerate(return_type):
    for q_type in quantile_type:
        ax[i].plot(
            c_sexp[truncation:],
            log_like_sexp[ret_type][q_type][truncation:],
            color=colors_mle[q_type],
            label=legend_labels_mle[q_type])
        
        ax[i].plot(
            c_sexp[i_max_sexp[ret_type][q_type]],
            log_like_sexp[ret_type][q_type][i_max_sexp[ret_type][q_type]],
            marker='o',
            color=colors_mle[q_type]
        )
    
    ax[i].set(title=titles[ret_type])
    
for a in ax:
    a.set(xlabel=r'$c = \alpha$', ylabel=r'$log(L_{s-exp})$')
    a.legend(loc='lower right')

sns.despine()

## 5.3 Fitting della q-exponential

La terza distribuzione è la [q-exponential](https://en.wikipedia.org/wiki/Q-exponential_distribution).

Creo allora la funzione che minimizza la log-likelihood della q-exp, che è

$$
ln(L_{q-exp}) = n \cdot \ln[\lambda (2 - q)] - \frac{1}{q - 1} \sum_{i=1}^{n} \ln[1 + (q - 1) \lambda \tau_i]
$$

dove $n$ è il numero di recurrence intervals, $\tau_i$ il valore dell'i-esimo recurrence interval, e il parametro $\lambda$ si stima così:

$$
\lambda = \frac{1}{\tau_Q(3 - 2q)}
$$

il parametro libero $q$ ha il range $\left( 0, \frac{3}{2} \right)$.

In [None]:
@timeit
def mle_qexp(rec_ints: np.ndarray, q: np.ndarray, tau_q: float):
    """MLE estimation for q-exponential distribution, given an array of q shape parameters and the tau_q,
    with the recurrence intervals rec_ints.
    """
    assert np.all(q < 1.5)
    
    n = rec_ints.shape[0]
    m = q.shape[0]
    
    lam = 1.0 / (tau_q * (3 - 2 * q))
    
    ll = np.zeros((q.shape[0], ), dtype=np.float64)
    
    for j in range(m):
        ssum = 0
        
        for i in range(n):
            ssum += np.log(1 + (q[j] - 1) * lam[j] * rec_ints[i])
            
        ll[j] = n * np.log(lam[j] * (2 - q[j])) - (1 / (q[j] - 1)) * ssum
        
    return ll

Ora usiamo la funzione per calcolarci il fitting della s-exp per i returns positivi, negativi ed assoluti:

In [None]:
q_qexp = np.arange(1.0, 1.5, 1e-3)

In [None]:
load_log_like = True

ll_qexp_file = f"./log-like-qexp_{split_key}.pickle"

if load_log_like:
    with open(ll_qexp_file, 'rb') as infile:
        log_like_qexp = pickle.load(infile)
else:
    log_like_qexp = dict()

    for ret_type in return_type:
        log_like_qexp[ret_type] = dict()
        print(f"\nReturn type: {ret_type}")

        for q_type in quantile_type:
            x = recurrence_intervals[ret_type][q_type]['n_days'].values
            print(f"Computing q-exp MLE on quantile: {q_type}, c={q_qexp.shape}")

            ll = mle_qexp(x, q_qexp, tau_q[ret_type][q_type])
            ll[np.isnan(ll)] = -np.inf
            
            log_like_qexp[ret_type][q_type] = ll
            
    log_like_qexp['c'] = q_qexp

    with open(ll_qexp_file, 'wb') as outfile:
          pickle.dump(log_like_qexp, outfile)

Ora prendiamo la massima log-likelihoood:

In [None]:
i_max_qexp = {
    ret_type: {
        q_type: np.argmax(log_like_qexp[ret_type][q_type])
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_shape['q-exp'] = {
    ret_type: {
        q_type: q_qexp[i_max_qexp[ret_type][q_type]]
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_lambda_qexp = {
    ret_type: {
        q_type: 1.0 / (tau_q[ret_type][q_type] * (3 - 2 * best_shape['q-exp'][ret_type][q_type]))
        for q_type in quantile_type
    }
    for ret_type in return_type
}

best_params['q-exp'] = {
    ret_type: {
        q_type: {
            'shape': best_shape['q-exp'][ret_type][q_type],
            'lambda': best_lambda_qexp[ret_type][q_type],
            'loc': 0.0,
        }
        for q_type in quantile_type
    }
    for ret_type in return_type
}

Plottiamo quindi i risultati della MLE:

In [None]:
colors_mle = {
    'evt': 'lightskyblue',
    '95': 'palegreen',
    '97.5': 'limegreen',
    '99': 'darkgreen',
}

legend_labels_mle = {
    '95': '95%',
    '97.5': '97.5%',
    '99': '99%',
    'evt': 'EVT',
}

titles = {
    'pos': r'Positive $log(r)$',
    'neg': r'Negative $log(r)$',
    'abs': r'Absolute $log(r)$',
}

fig, ax = pl.subplots(nrows=3, ncols=1, figsize=(14, 15))

# positive log-returns
truncation = 100
for i, ret_type in enumerate(return_type):
    for q_type in quantile_type:
        ax[i].plot(
            q_qexp,
            log_like_qexp[ret_type][q_type],
            color=colors_mle[q_type],
            label=legend_labels_mle[q_type])
        
        ax[i].plot(
            q_qexp[i_max_qexp[ret_type][q_type]],
            log_like_qexp[ret_type][q_type][i_max_qexp[ret_type][q_type]],
            marker='o',
            color=colors_mle[q_type]
        )
    
    ax[i].set(title=titles[ret_type])
    
for a in ax:
    a.set_ylim([-1500, 0.0])
    a.set(xlabel=r'$c = q$', ylabel=r'$log(L_{q-exp})$')
    a.legend(loc='lower left')

sns.despine()

## 5.4 Calcolo Hazard Probability

Perfetto, ora ho i parametri della Weibull, della s-exp e della q-exp per ogni tipo di return e di threshold. Posso quindi ottenere la curva teorica per il fitting dei recurrence intervals.

Per la Weibull è

$$
W_W(\Delta t | t) = 1 - e^{\left[ \left( \frac{t}{\beta} \right)^\alpha - \left( \frac{t + \Delta t}{\beta} \right)^\alpha \right]}
$$

dove $\alpha = c^*$ lo *shape* ottimale, e $\beta = \frac{\tau_Q}{\Gamma \left( 1 + \frac{1}{\alpha} \right)}$ lo *scale* ottimale.

Per la s-exp è:

$$
W_{s-exp}(\Delta t | t) = \frac{\frac{bc}{a} - \Gamma_l \left( \frac{1}{c}, (bt)^c \right) - \Gamma_u \left( \frac{1}{c}, [b(t + \Delta t)]^c \right)}{\Gamma_u \left( \frac{1}{c}, (bt)^c \right)}
$$

dove

$$
\Gamma_u (a, x) = \Gamma (a, x, +\infty) = \int_{x}^{+\infty} t^{a - 1} e^{-t} dt
$$

è la *upper incomplete Gamma function* e

$$
\Gamma_l (a, x) = \Gamma (a, 0, x) = \int_{0}^{x} t^{a - 1} e^{-t} dt
$$

è la *lower incomplete Gamma function*. Nel nostro caso, abbiamo quindi che $\Gamma_l \left( \frac{1}{c}, (bt)^c \right)$ si traduce in $a = 1/c$ e $x = (bt)^c$, mentre $\Gamma_u \left( \frac{1}{c}, [b(t + \Delta t)]^c \right)$ in $a = 1/c$ e $x = [b(t + \Delta t)]^c$.

Per la q-exp è:

$$
W_{q-exp}(\Delta t | t) = 1 - \left[ 1 + \frac{(q - 1)\lambda \Delta t}{1 + (q - 1)\lambda t} \right]^{1 - \frac{1}{q - 1}}
$$

Mi creo allora le funzioni che le calcolano

In [None]:
def weibull_hazard(t, shape, scale, delta_t=1):
    part_1 = np.power((t / scale), shape)
    part_2 = np.power(((t + delta_t) / scale), shape)
    
    hazard = 1 - np.exp(part_1 - part_2)
    
    return hazard

def sexp_hazard(t, c, a, b, delta_t=1):
    num1 = (b * c / a)
    num2 = sfun.gammainc((1.0 / c), np.power((b * t), c)) * sfun.gamma(1.0 / c)
    num3 = sfun.gammaincc((1.0 / c), np.power(b * (t + delta_t), c)) * sfun.gamma(1.0 / c)
    
    num = num1 - num2 - num3
    
    denom = sfun.gammaincc((1.0 / c), np.power(b * t, c))
    
    hazard = num / denom
    
    return hazard

def qexp_hazard(t, q, lam, delta_t=1):
    num = (q - 1) * lam * delta_t
    denom = 1 + (q - 1) * lam * t
    exponent = 1 - (1 / (q - 1))
    
    hazard = 1 - np.power((1 + num / denom), exponent)
    
    return hazard

In [None]:
max_t = 60
x = np.arange(max_t + 1)

ret_type = 'neg'
q_type = '99'

theoretical_hazard = {
    'weibull': weibull_hazard(
        x,
        best_params['weibull'][ret_type][q_type]['shape'],
        best_params['weibull'][ret_type][q_type]['scale']
    ),
    's-exp': sexp_hazard(
        x,
        best_params['s-exp'][ret_type][q_type]['shape'],
        best_params['s-exp'][ret_type][q_type]['a'],
        best_params['s-exp'][ret_type][q_type]['b'],
    ),
    'q-exp': qexp_hazard(
        x,
        best_params['q-exp'][ret_type][q_type]['shape'],
        best_params['q-exp'][ret_type][q_type]['lambda'],
    )
}

empirical_hazard = np.array([
    get_empirical_hazard_prob(recurrence_intervals[ret_type][q_type]['n_days'].values, t, 1)
    for t in x
])

In [None]:
fig, ax = pl.subplots(nrows=1, ncols=1, figsize=(16, 9))

dist_colors = {
    'weibull': 'orchid',
    's-exp': 'orangered',
    'q-exp': 'mediumblue'
}

dist_labels = {
    'weibull': r'$W_W$',
    's-exp': r'$W_{s-exp}$',
    'q-exp': r'$W_{q-exp}$',
}

for dist_type in distribution_type:
    ax.plot(
        x,
        theoretical_hazard[dist_type],
        color=dist_colors[dist_type],
        label=dist_labels[dist_type])
    
ax.plot(
    x,
    empirical_hazard,
    label=r'$W_{emp}$',
    color='black',
    linestyle='-',
    marker='o',
    markersize=1,
    linewidth=0.5
)

ax.legend(fontsize=14)
ax.set_xlabel(r'$t$', fontsize=16)
ax.set_ylabel(r'$W(1 | t)$', fontsize=16)
ax.set_title(r'Hazard probability for $q = 0.99$', fontsize=16)

sns.despine()

# 6. Calcolo del miglior threshold $w_t$ massimizzando la *utility* $U(\theta)$

Ora dobbiamo calcolare il miglior threshold $w_t$ oltre il quale si dà il warning, cioè:

- se $W(1|t) \geq w_t$ --> warning --> 1
- se $W(1|t) < w_t$ --> no warning --> 0

Bisogna definire un peso $\theta$ che si attribuisce alla Recall o al FPR, dove un valore di $\theta$ maggiore dà più peso alla Recall. Inoltre, si definiscono due funzioni:

- la *loss function*, che utilizza il peso:
$$
L(\theta) = \theta (1 - Recall) + (1 - \theta)FPR
$$
- la *utility function*, che dipende dalla *loss*:
$$
U(\theta) = \min(\theta, 1 - \theta) - L(\theta)
$$
che deve essere $U > 0$ per essere utile, e va massimizzata sul *training set*

I passi per farlo sono i seguenti, utilizzando le distribuzioni fittate con i parametri migliori `best_params`:

1. [x] fissare un valore $\hat{\theta}$ per il parametro $\theta$
2. [x] visto che la $Recall = f(w_t)$ e $FPR = g(w_t)$, far variare $w_t \in [0, 1]$ per ottenere tutti i possibili valori di $U(\theta^*)$, cioé:
    1. [x] scegliere un $w_t \in [0, 1] = w_t^i$
    2. [x] calcolare le hazard probability delle tre distribuzioni $W_W(\Delta t|t)$, $W_{s-exp}(\Delta t|t)$ e $W_{q-exp}(\Delta t|t)$ e trasformarle nel target binario $[0, 1]$ a seconda che siano minori o maggiori di $w_t$. Il $\Delta t$ è il periodo tra un intervallo di ricorrenza e l'altro a questo punto
    3. [x] calcolare le metriche $Recall(w_t^i)$, $FPR(w_t^i)$, $KSS(w_t^i)$ dove
    $$
    KSS = Recall - FPR
    $$
    4. [x] calcolare quindi la loss $L(\hat{\theta}, w_t^i) = (1 - Recall(w_t^i))\hat{\theta} + (1 - \hat{\theta})FPR(w_t^i)$
    5. [x] calcolare di conseguenza il valore della utility $U(\hat{\theta})|_{w_t^i}$
3. [x] dopo aver svolto il punto 2 per ogni valore di $w_t \in [0, 1]$, selezionare il massimo $w_t$ con $argmax_{w_t} U(\hat{\theta})$
4. [ ] plot della ROC curve per il training ed il testing set
    1. [x] training set
    2. [ ] testing set

Cominciamo col definire $\theta$, il range di $w_t$ e una funzione per la recall, l'FPR e il KSS score:

In [None]:
# scelta di theta
theta = 0.5

# w_t
n_points = 1000
w_t = np.linspace(0, 1, n_points + 1)

Ora definisco la *loss* e la *utility*

In [None]:
def loss_function(theta, recall, fpr):
    """The loss function L = theta * (1 - recall) + (1 - theta) * fpr"""
    assert theta >= 0.0 and theta <= 1.0
    
    return theta * (1 - recall) + (1 - theta) * fpr

def utility_function(theta, loss):
    """The utility function U = min(theta, 1 - theta) - loss"""
    return min(theta, 1 - theta) - loss

e una funzione che, data la funzione di *hazard* teorica, calcola la probabilità in ogni giorno che ci sia un estremo

In [None]:
# funzione 
def hazard_prob(hazard_fn, extremes: np.ndarray):
    """
    For every time t, predict the hazard probability using the supplied hazard function.
    
    Parameters
    ----------
    hazard_fn: Callable[[int], float]
        function that returns the hazard probability W(1|t), where t is the time passed
        since the last extreme event
    
    extremes: np.ndarray
        binary array of extremes, where 1 means extreme and 0 means normal, tim-ordered
        
    Returns
    -------
    prob: np.ndarray
        hazard probability, at every time t, that there will be an extreme event
    """
    assert isinstance(extremes, np.ndarray)
    
    ext_ind = np.argwhere(extremes == 1).flatten()
    assert ext_ind.shape[0] >= 2  # ci sono almeno 2 estremi, sennò tutto questo non ha senso
    
    n = extremes.shape[0]
    probs = np.zeros((n, ), dtype=np.float64)
    
    # curr_extreme_ind è sempre sull'ultimo estremo visto, a partire dall'inizio, fino al penultimo
    # next_extreme_ind è sempre sul prossimo estremo, fino all'ultimo
    for curr_extreme_ind, next_extreme_ind in zip(ext_ind[:-1], ext_ind[1:]):
        t = 1
        
        while curr_extreme_ind + t <= next_extreme_ind:
            probs[curr_extreme_ind + t] = hazard_fn(t)
            if probs[curr_extreme_ind + t] < 0.0:
                ipdb.set_trace()
            t = t + 1
            
    # ora gli ultimi, può capitare che l'ultimo estremo non sia l'ultimo elemento di extremes
    curr_extreme_ind = ext_ind[-1]
    if curr_extreme_ind < n - 1:
        t = 1
        
        while curr_extreme_ind + t < n:
            probs[curr_extreme_ind + t] = hazard_fn(t)
            t = t + 1
    
    return probs

Bene, ora calcolo le probabilità teoriche per ogni tipo di distribuzione, di return e di quantile.

Ogni volta è la migliore versione per quel tipo di return, di quantile e distribuzione

In [None]:
def get_all_hazard_probabilities(best_params, extremes, verbose=False):
    """Get all the hazard probabilities, for every distribution, return and threshold type."""
    # per ogni distribuzione, tipo di ritorno e quantile calcola l'hazard probability
    hazard_probabilities = dict()

    for dist_type in distribution_type:
        hazard_probabilities[dist_type] = dict()

        for ret_type in return_type:
            hazard_probabilities[dist_type][ret_type] = dict()

            for q_type in quantile_type:
                if verbose:
                    print(f"\nDist: {dist_type}\t ret_type: {ret_type}\t q_type: {q_type}")
                bp = best_params[dist_type][ret_type][q_type]
                ext = extremes[ret_type][q_type].values

                if dist_type == 'weibull':
                    shape = bp['shape']
                    scale = bp['scale']
                    
                    if verbose:
                        print(f"Using Weibull with params shape: {shape:4.3f}, scale: {scale:4.3f}")

                    def f(x):
                        return weibull_hazard(x, shape, scale)


                    hazard_probabilities[dist_type][ret_type][q_type] = hazard_prob(f, ext)

                elif dist_type == 's-exp':
                    a = bp['a']
                    b = bp['b']
                    c = bp['shape']
                    
                    if verbose:
                        print(f"Using s-exp with params a: {a:4.3f}, b: {b:4.3f}, c: {c:4.3f}")

                    def f(x):
                        return sexp_hazard(x, c, a, b)

                    hazard_probabilities[dist_type][ret_type][q_type] = hazard_prob(f, ext)

                elif dist_type == 'q-exp':
                    q = bp['shape']
                    lam = bp['lambda']

                    if verbose:
                        print(f"Using q-exp with params q: {q:4.3f}, lambda: {lam:4.3f}")

                    def f(x):
                        return qexp_hazard(x, q, lam)

                    hazard_probabilities[dist_type][ret_type][q_type] = hazard_prob(f, ext)
                else:
                    raise ValueError(f"unrecognized distribution name {dist_type}")
                    
    return hazard_probabilities

In [None]:
hazard_probabilities = get_all_hazard_probabilities(best_params, extremes)

Verifichiamo che non ci siano elementi negativi (quindi calcoli spurii):

In [None]:
for dist_type in distribution_type:
    for ret_type in return_type:
        for q_type in quantile_type:
            if np.any(hazard_probabilities[dist_type][ret_type][q_type] < 0.0):
                print(f"Errori in dist: {dist_type}, ret_type: {ret_type}, q_type: {q_type}")

            if np.any(hazard_probabilities[dist_type][ret_type][q_type] > 1.0):
                print(f"Probabilità > 1 in dist: {dist_type}, ret_type: {ret_type}, q_type: {q_type}")

Ok, prendiamoli come errori numerici.

Ora, a seconda del threshold $w_t$ convertiamo le probabilità in un target binario e calcoliamo le performance, la loss e la utility per ogni valore di $w_t$

In [None]:
def to_binary(prob: np.ndarray, thresh: float):
    assert thresh <= 1.0 and thresh >= 0.0
    
    return (prob >= thresh).astype(np.int8)


def recall_fpr_kss(y_true, y_pred):
    """Compute recall, fpr and KSS score."""
    tp = np.sum(np.logical_and(y_true, y_pred))
    tn = np.sum(np.logical_and(
        np.logical_not(y_true),
        np.logical_not(y_pred)
    ))
    fp = np.sum(np.logical_and(
        np.logical_not(y_true),
        y_pred
    ))
    fn = np.sum(np.logical_and(
        y_true,
        np.logical_not(y_pred)
    ))
    
    recall = tp / (tp + fn)  # TP / (TP + FN)
    fpr = fp / (fp + tn)  # FP / (FP + TN)
    
    kss = recall - fpr
    
    return recall, fpr, kss

In [None]:
# per tutti i valori possibili del threshold
def optimize_wt(w, haz_probs, extremes, theta):
    """Find the best threshold and the performance with the supplied w."""
    # devo salvarmi le recall e gli fpr per ogni valore di w_t, dist, return, quantile
    recalls = {
        dist_type: {
            ret_type: {
                q_type: np.zeros((len(w_t), ), dtype=np.float64)
                for q_type in quantile_type
            }
            for ret_type in return_type
        }
        for dist_type in distribution_type
    }

    fprs = copy.deepcopy(recalls)
    ksss = copy.deepcopy(recalls)
    losses = copy.deepcopy(recalls)
    utilities = copy.deepcopy(recalls)

    # per tutti i w_t = thresh
    for i, thresh in enumerate(w):
        if i % 200 == 0:
            print(f"iteration: {i}/{len(w)}")
        # per tutte le distribuzioni
        for dist_type in distribution_type:

            # per tutti i tipi di returns
            for ret_type in return_type:

                # per tutti i tipi di threshold/quantile
                for q_type in quantile_type:
                    y_pred = to_binary(haz_probs[dist_type][ret_type][q_type], thresh)
                    y_true = extremes[ret_type][q_type]

                    recall, fpr, kss = recall_fpr_kss(y_true, y_pred)
                    loss = loss_function(theta, recall, fpr)
                    utility = utility_function(theta, loss)

                    recalls[dist_type][ret_type][q_type][i] = recall
                    fprs[dist_type][ret_type][q_type][i] = fpr
                    ksss[dist_type][ret_type][q_type][i] = kss
                    losses[dist_type][ret_type][q_type][i] = loss
                    utilities[dist_type][ret_type][q_type][i] = utility
             
    # gli indici in w_t dove c'è la combinazione migliore di distribuzione, return e quantili
    best_indexes = {
        dist_type: {
            ret_type: {
                q_type: np.argmax(utilities[dist_type][ret_type][q_type])
                for q_type in quantile_type
            }
            for ret_type in return_type
        }
        for dist_type in distribution_type
    }
    
    return recalls, fprs, ksss, losses, utilities, best_indexes

Vediamo le curve ROC per ogni return, threshold e distribuzione

In [None]:
recalls, fprs, ksss, losses, utilities, best_indexes = optimize_wt(w_t, hazard_probabilities, extremes, theta)

In [None]:
size = (7 * len(quantile_type), 7 * len(return_type))

fig, ax = pl.subplots(nrows=len(quantile_type), ncols=(len(ret_type)), figsize=size)
fig.suptitle("ROC curves for the training set", fontsize=16)

# sulle righe i quantili
for i, q_type in enumerate(quantile_type):
    
    # sulle colonne i return
    for j, ret_type in enumerate(return_type):
        
        # in ogni grafico, le 3 distribuzioni
        for dist_type in distribution_type:
            i_sorted = np.argsort(fprs[dist_type][ret_type][q_type])

            x = fprs[dist_type][ret_type][q_type][i_sorted]
            y = recalls[dist_type][ret_type][q_type][i_sorted]

            ax[i, j].plot(x, y, color=dist_colors[dist_type], alpha=0.75, label=str.title(dist_type))

        ax[i, j].plot([0, 1], [0, 1], color='black', linewidth=0.5)

        ax[i, j].legend(loc='lower right', fontsize=11)
        
        ax[i, j].set_xlim([0, 1])
        ax[i, j].set_ylim([0, 1])
        
        ax[i, j].set_title(f"returns = {ret_type} | threshold = {q_type}")
        
for a in ax[-1, :]:
    a.set_xlabel('FPR', fontsize=16)
    
for a in ax[:, 0]:
    a.set_ylabel('Recall', fontsize=16)

sns.despine()

## 7. Predizione nel periodo out-of-sample (test set)

Ora bisogna utilizzare le 3 distribuzioni, già fittate sul training set, per predire sul test set.

Riutilizzo il codice appena usato:

In [None]:
hazard_probabilities_test = get_all_hazard_probabilities(best_params, extremes_test)

In [None]:
recalls_test, fprs_test, ksss_test, losses_test, utilities_test, best_indexes_test = optimize_wt(w_t, hazard_probabilities_test, extremes_test, theta)

In [None]:
size = (7 * len(quantile_type), 7 * len(return_type))

fig, ax = pl.subplots(nrows=len(quantile_type), ncols=(len(ret_type)), figsize=size)
fig.suptitle("ROC curves for the test set", fontsize=16)

# sulle righe i quantili
for i, q_type in enumerate(quantile_type):
    
    # sulle colonne i return
    for j, ret_type in enumerate(return_type):
        
        # in ogni grafico, le 3 distribuzioni
        for dist_type in distribution_type:
            i_sorted = np.argsort(fprs_test[dist_type][ret_type][q_type])

            x = fprs_test[dist_type][ret_type][q_type][i_sorted]
            y = recalls_test[dist_type][ret_type][q_type][i_sorted]

            ax[i, j].plot(x, y, color=dist_colors[dist_type], alpha=0.75, label=str.title(dist_type))

        ax[i, j].plot([0, 1], [0, 1], color='black', linewidth=0.5)

        ax[i, j].legend(loc='lower right', fontsize=11)
        
        ax[i, j].set_xlim([0, 1])
        ax[i, j].set_ylim([0, 1])
        
        ax[i, j].set_title(f"returns = {ret_type} | threshold = {q_type}")
        
for a in ax[-1, :]:
    a.set_xlabel('FPR', fontsize=16)
    
for a in ax[:, 0]:
    a.set_ylabel('Recall', fontsize=16)

sns.despine()

## 8. Confronto in-sample vs out-of-sample

Confrontiamo allora le performance su training e test set.

Bisogna tenere a mente, però, che il training set si ferma **prima** della crisi finanziaria del 2007-2008, quindi è possibile che il modello così sviluppato abbia performance migliori sul testing set piuttosto che sul training set, poiché in quel periodo ci sono molti estremi.

### 8.1 Plot di $\tau$ vs $p(\tau)$

Confrontiamo il fitting delle tre distribuzioni sui ritorni negativi con threshold al 99%. Nel plot ci sarà:

- la *PDF* empirica
- la *PDF* della Weibull
- la *PDF* della s-exponential
- la *PDF* della q-exponential

In [None]:
# TODO - forse

### 8.2 Plot di $W(1|t)$ e $W_{emp}(1|t)$

Vediamo le hazard probability empiriche e fittate, sia sul training che sul testing set, coi returns negativi e threshold 99%.

In [None]:
max_t = 60
x = np.arange(max_t + 1)

ret_type = 'neg'
q_type = '99'

theoretical_hazard_test = {
    'weibull': weibull_hazard(
        x,
        best_params['weibull'][ret_type][q_type]['shape'],
        best_params['weibull'][ret_type][q_type]['scale']
    ),
    's-exp': sexp_hazard(
        x,
        best_params['s-exp'][ret_type][q_type]['shape'],
        best_params['s-exp'][ret_type][q_type]['a'],
        best_params['s-exp'][ret_type][q_type]['b'],
    ),
    'q-exp': qexp_hazard(
        x,
        best_params['q-exp'][ret_type][q_type]['shape'],
        best_params['q-exp'][ret_type][q_type]['lambda'],
    )
}

empirical_hazard_test = np.array([
    get_empirical_hazard_prob(recurrence_intervals_test[ret_type][q_type]['n_days'].values, t, 1)
    for t in x
])

In [None]:
fig, ax = pl.subplots(nrows=1, ncols=2, figsize=(20, 7))

dist_colors = {
    'weibull': 'orchid',
    's-exp': 'orangered',
    'q-exp': 'mediumblue'
}

dist_labels = {
    'weibull': r'$W_W$',
    's-exp': r'$W_{s-exp}$',
    'q-exp': r'$W_{q-exp}$',
}

for dist_type in distribution_type:
    ax[0].plot(
        x,
        theoretical_hazard[dist_type],
        color=dist_colors[dist_type],
        label=dist_labels[dist_type]
    )
    
    ax[1].plot(
        x,
        theoretical_hazard_test[dist_type],
        color=dist_colors[dist_type],
        label=dist_labels[dist_type]
    )
    
ax[0].plot(
    x,
    empirical_hazard,
    label=r'$W_{emp}$',
    color='black',
    linestyle='-',
    marker='o',
    markersize=1,
    linewidth=0.5
)

ax[1].plot(
    x,
    empirical_hazard_test,
    label=r'$W_{emp}$',
    color='black',
    linestyle='-',
    marker='o',
    markersize=1,
    linewidth=0.5
)

for a in ax:
    a.legend(fontsize=14)
    a.set_xlabel(r'$t$', fontsize=16)
    a.set_ylabel(r'$W(1 | t)$', fontsize=16)

ax[0].set_title(r'Hazard probability for $q = 0.99$ | training set', fontsize=16)
ax[1].set_title(r'Hazard probability for $q = 0.99$ | test set', fontsize=16)

sns.despine()

### 8.3 Recall, FPR, KSS score al variare di $w_t$

Vediamo ora come i valori di performance sul training e testing set, per il threshold ottimale $w_t^*$

In [None]:
# i migliori threshold per w_t, calcolati sul trainin set
best_thresholds = {
    dist_type: {
        ret_type: {
            q_type: w_t[best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

#-----------------------------------#
# recall per w_t ottimale
best_recalls = {
    dist_type: {
        ret_type: {
            q_type: recalls[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

best_recalls_test = {
    dist_type: {
        ret_type: {
            q_type: recalls_test[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

#-----------------------------------#
# fpr per w_t ottimale
best_fprs = {
    dist_type: {
        ret_type: {
            q_type: fprs[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

best_fprs_test = {
    dist_type: {
        ret_type: {
            q_type: fprs_test[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

#-----------------------------------#
# kss per w_t ottimale
best_ksss = {
    dist_type: {
        ret_type: {
            q_type: ksss[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

best_ksss_test = {
    dist_type: {
        ret_type: {
            q_type: ksss_test[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

#-----------------------------------#
# utility per w_t ottimale
best_utilities = {
    dist_type: {
        ret_type: {
            q_type: utilities[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

best_utilities_test = {
    dist_type: {
        ret_type: {
            q_type: utilities_test[dist_type][ret_type][q_type][best_indexes[dist_type][ret_type][q_type]]
            for q_type in quantile_type
        }
        for ret_type in return_type
    }
    for dist_type in distribution_type
}

In [None]:
best_utilities

## COSA NON FUNZIONA

- [ ] La EVT ha come shape parameter un numero maggiore di 1, non va bene perché la curva qui sopra viene al contrario.