## Investigate various mean-reversion tests

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
from statsmodels.tsa.stattools import adfuller

In [3]:
%matplotlib widget
# %matplotlib auto

### Simulate mean-reverting paths

In [4]:
def simulate_paths(npaths, ndays, x0, vol, mrspeed, mrlevel = None, seed = 0):
    if mrlevel is None:
        mrlevel = x0

    dt = 1.0/250
    sqrtdt = np.sqrt(dt)

    np.random.seed(seed = seed)
    dW = np.random.normal(size = (npaths,ndays-1))*sqrtdt

    x = np.zeros((npaths, ndays))
    x[:,0] = x0
    for d in np.arange(ndays-1):
        x[:,d+1] = x[:,d] + mrspeed*(mrlevel - x[:,d]) * dt + vol * dW[:,d]

    return x


### Change mr speed to see how simulated paths look

In [None]:
annvol = 0.01
rev = 0.5
npaths_to_use = 10000
nyears = 1
seed = 314
paths_to_plot = simulate_paths(npaths = npaths_to_use, ndays = nyears*250, x0 = 0.0, vol = annvol, mrspeed = rev, seed=seed)

plt.figure()
plt.plot(paths_to_plot[:10,:].T)
plt.show()

### A few (equivalent) ways to calculate the regression coef of dx on x per path

In [6]:
def regress_dx_on_x(paths):
    '''
    calculate regr coeff dx ~ mean + slope x for each path (ie row of the paths matrix)
    annualize the results
    '''
    dt = 1.0/250

    dx = np.diff(paths, axis=1)
    x = paths[:,:-1]

    mean_x = x.mean(axis=1).reshape(-1,1)
    mean_dx = dx.mean(axis=1).reshape(-1,1)

    cov_x_dx = ((x - mean_x)*(dx - mean_dx)).sum(axis=1)
    var_x = ((x-mean_x)**2).sum(axis=1)
    
    slope = (cov_x_dx/var_x)/dt

    # slope_z_score = (slope - slope.mean())/slope.std()
    slope_z_score = (slope - 0)/slope.std()
    return slope, slope_z_score


In [7]:
def regress_dx_on_x_v2(paths):
    '''
    calculate regr coeff dx ~ mean + slope x for each path (ie row of the paths matrix)
    annualize the results
    '''
    dt = 1.0/250

    dx = np.diff(paths, axis=1)
    x = paths[:,:-1]

    npaths = x.shape[0]
    slope = np.zeros(x.shape[0])
    for n in range(npaths):
        cov_x_dx = np.cov(dx[n,:],x[n,:])[0,1]
        var_x = x[n,:].var()
        slope[n] = cov_x_dx/var_x/dt

    # slope_z_score = (slope - slope.mean())/slope.std()
    slope_z_score = (slope - 0)/slope.std()
    return slope, slope_z_score

In [8]:
def regress_dx_on_x_v3(paths):
    '''
    calculate regr coeff dx ~ mean + slope x for each path (ie row of the paths matrix)
    annualize the results
    '''
    dt = 1.0/250

    dx = np.diff(paths, axis=1)
    x = paths[:,:-1]

    npaths = x.shape[0]
    slope = np.zeros(x.shape[0])
    const_coef = np.ones(x.shape[1])
    for n in range(npaths):
        lhs = np.vstack((const_coef, x[n,:])).T
        sol = np.linalg.lstsq(lhs, dx[n,:])
        slope[n] = sol[0][1]/dt

    # slope_z_score = (slope - slope.mean())/slope.std()
    slope_z_score = (slope - 0)/slope.std()
    return slope, slope_z_score

### Plot the regression coef and its z-score (for deviation from 0) per path

In [None]:

# test_res = regress_dx_on_x(paths = paths_to_plot)
# test_res = regress_dx_on_x_v2(paths = paths_to_plot)
test_res = regress_dx_on_x_v3(paths = paths_to_plot)

plt.figure()
plt.plot(test_res[0][:1000], label = 'slope')
plt.legend(loc = 'best')
plt.show()


plt.figure()
plt.plot(test_res[1][:1000], label = 'zscore for slope')
plt.legend(loc = 'best')
plt.show()

    

In [None]:
test_res[0].mean(),test_res[1].mean()

### Compare empirically estimated daily stdev vs thoretical Brownian motion and theoretical mean-rev

In [None]:
dt = 1.0/250
paths_stdev = paths_to_plot.std(axis=0)
ts = np.arange(len(paths_stdev))*dt

plt.figure()
plt.plot(paths_stdev, label = 'actual stdev')
plt.plot(np.sqrt(ts)*annvol, label = 'BM expected stdev')
plt.plot(np.sqrt((1-np.exp(-2*rev*ts))/(2*rev))*annvol, label = 'MR expected stdev')
plt.legend(loc = 'best')
plt.show()

In [None]:
# same but as var not stdev
plt.figure()
plt.plot(paths_stdev**2, label = 'actual var')
plt.plot(ts*annvol**2, label = 'BM expected var')
plt.plot((1-np.exp(-2*rev*ts))/(2*rev)*annvol**2, label = 'MR expected var')
plt.legend(loc = 'best')
plt.show()

In [None]:
# now fitting a power (linear in logs) function to estimate the Hurst exponent
start_idx = 10
poly = np.polyfit(np.log(ts[start_idx:]), np.log(paths_stdev[start_idx:]), 1)
print(poly[0])
powerfit = np.polyval(poly, np.log(0.0001 + ts))

plt.figure()
plt.plot(np.log(ts[start_idx:]), np.log(paths_stdev[start_idx:]), '.', label = 'log log stdev')
plt.plot(np.log(ts[start_idx:]), np.log(np.sqrt((1-np.exp(-2*rev*ts[start_idx:]))/(2*rev))*annvol), label = 'log log MR expected stdev')
plt.plot(np.log(ts[start_idx:]), powerfit[start_idx:], label = f'log log power fit, exp = {poly[0]}')
plt.xlabel('log of time')
plt.ylabel('log of daily stdev')
plt.legend(loc = 'best')
plt.show()

### Mean rev speed to Hurst exponent via averaged expansions

In [None]:
# Mean reversion to Hurst exponent analytically
def mean_rev_to_hurst(mrspeed, lag_expansion_pts = None):
    '''
    Calculate Hurst exponent analytically from mean reversion speed. see my notes. depends
    on the expansion points that could be provided -- we then take a simple average. Otherwise we use a sensible default
    See my notes (L3_...)
    '''
    if abs(mrspeed) < 1e-4:
        mrspeed = 1e-4

    if lag_expansion_pts is None:
        lag_expansion_pts = [1./52, 1./12, 1./4, 1./2, 1.]
    hs = np.zeros(len(lag_expansion_pts))
    for idx, tau0 in enumerate(lag_expansion_pts):
        exptau0 = np.exp(-2*mrspeed*tau0)
        hs[idx] = mrspeed * tau0 * exptau0 / (1-exptau0)
    return np.mean(hs)

# test
show_plot = True
if show_plot:
    revs = np.linspace(-1,1,101, endpoint=True)
    h_revs = [mean_rev_to_hurst(r) for r in revs]
    plt.figure()
    plt.plot(revs, h_revs, label = 'Hurst exponent')
    plt.xlabel('mean reversion speed')
    plt.ylabel('Hurst exponent')
    plt.title('Analytical calc of Hurst exponent from mean reversion speed')
    plt.legend(loc='best')
    plt.show()


### Investigate the ADF method for detecting mean-reversion

In [15]:
nadf = 1000
adf_max_lag = 1
adf_regr_params = 'c' # 'ct'
adf_stat = np.zeros(nadf)
adf_pval = np.zeros(nadf)
for n in range(nadf):
    res = adfuller(paths_to_plot[n,:], maxlag=adf_max_lag, regression=adf_regr_params)
    adf_stat[n] = res[0]
    adf_pval[n] = res[1]

In [None]:
plt.figure()
plt.plot(adf_stat, label = 'adf stat')
plt.title('adf stat')
plt.legend(loc = 'best')
plt.show()

plt.figure()
plt.plot(adf_pval, label = 'adf pval')
plt.plot(np.ones_like(adf_pval[:nadf])*0.05, label = '0.05 treshold')
plt.title('adf pval')
plt.legend(loc = 'best')
plt.show()
print(sum(adf_pval < 0.05)/len(adf_pval))

In [None]:
pathnum = 1
paths = paths_to_plot
dx = np.diff(paths, axis=1)
x = paths[:,:-1]

dxfitcoefs = np.polyfit(x[pathnum,:], dx[pathnum,:],deg = 1)
dxfit = np.polyval(dxfitcoefs, x[pathnum,:])

plt.figure()
plt.plot(x[pathnum,:], dx[pathnum,:], '.', label = 'dx vs x')
plt.plot(x[pathnum,:], dxfit, '-', label = 'fit to dx vs x')
plt.title(f'mr={rev}')
plt.legend(loc = 'best')
plt.show()

print(dxfitcoefs[0]/dt)
print('adf p-value=', adfuller(paths[pathnum,:])[1])

### Cross-sectional (all paths at once for a given time) regression of dx on x
We expect to recover the mean reversion speed we put into simulation, with a high degree of precision -- and we actually do
Thus, this is mostly a consistewncy test. But also demonstrates how much better the cross-sectional regression is vs time-based one
Of course we annualize the results and sign them to be consistent with the conventions of the simuation

In [18]:
def regress_dx_on_x_crosssec_v1(paths):
    '''
    calculate regr coeff dx ~ mean + slope x for each time 
    THIS IS A CROSS-SECTIONAL REGRESSION CALCULATION
    so we expect to recover the mean reversion speed we put into simulation, with a high degree of precision -- and we actually do
    Thus, this is mostly a consistewncy test. But also demonstrates how much better the cross-sectional regression is vs time-based one
    Of course we annualize the results and sign them to be consistent with the conventions of the simuation
    '''
    dt = 1.0/250

    dx = np.diff(paths, axis=1)
    x = paths[:,:-1]

    mean_x = x.mean(axis=0).reshape(1,-1)
    mean_dx = dx.mean(axis=0).reshape(1,-1)

    cov_x_dx = ((x - mean_x)*(dx - mean_dx)).sum(axis=0)
    var_x = ((x-mean_x)**2).sum(axis=0)
    
    slope = -(cov_x_dx/var_x)/dt

    # slope_z_score = (slope - slope.mean())/slope.std()
    slope_z_score = (slope - 0)/slope[1:].std()
    return slope[1:], slope_z_score[1:]

In [None]:
test_res_xsec = regress_dx_on_x_crosssec_v1(paths = paths_to_plot)


In [None]:
plt.figure()
plt.plot(test_res_xsec[0][10:], label = 'mrspeed estimate per time point')
plt.plot(test_res_xsec[0].mean() * np.ones_like(test_res_xsec[0]), label = 'average')
plt.legend(loc = 'best')

print(np.nanmean(test_res_xsec[0]))

In [None]:
dx = np.diff(paths_to_plot, axis=1)
x = paths_to_plot[:,:-1]

timeidx = 100

dxfitcoefs_xf = np.polyfit(x[:,timeidx], dx[:,timeidx], deg = 1)
dxfit_xf = np.polyval(dxfitcoefs_xf, x[:,timeidx])

pltstep = 20
plt.figure()
plt.plot(x[::pltstep,timeidx], dx[::pltstep, timeidx],'.',label = 'simulated')
plt.plot(x[::pltstep,timeidx], dxfit_xf[::pltstep],'_',label='linear fit')
annslope = dxfitcoefs_xf[0]/dt
plt.title(f'Cross-section regression, time={timeidx*dt}y, mrspeed(ann) = {annslope:.2f}')
plt.xlabel('state x')
plt.ylabel('increment dx')
plt.legend(loc='best')
plt.show()
# print(annslope)

### Investigate the Hurst exponent method for MR

In [None]:
pathidx = 0
plot_lags = np.array([1, 5,10,20])
tau = [np.std(np.subtract(paths_to_plot[pathidx,lag:], paths_to_plot[pathidx,:-lag])) for lag in plot_lags]
poly = np.polyfit(np.log(plot_lags), np.log(tau), 1)
tau_fit = np.polyval(poly, np.log(plot_lags))
plt.figure()
plt.plot(np.log(plot_lags), np.log(tau), 'o', label = 'realized vol vs lag actual')
plt.plot(np.log(plot_lags), tau_fit, '-', label = 'realized vol vs lag fit')
plt.xlabel('log(lag)')
plt.ylabel('log(realized vol)')
plt.legend(loc='best')
plt.show()
print(f'Hurst={poly[0]}')

In [23]:
def calc_hurst_exponent(xs, lags = None):
    """
    adapted from https://www.quantstart.com/articles/Basics-of-Statistical-Mean-Reversion-Testing/
    Returns the Hurst Exponent of the time series vector xs
    In our version xs is npaths x ndays array
    and output is an npaths array

    Parameters
    ----------
    xs : `numpy.array`
        Time series upon which the Hurst Exponent will be calculated

    Returns
    -------
    'float'
        The Hurst Exponent from the poly fit output
    """
    # Create the range of lag values
    if lags is None:
        lags = [1, 5,10,20]
    lags = np.asarray(lags)

    npaths = xs.shape[0]
    hs = np.zeros(npaths)
    for pathidx in range(npaths):
        # Calculate the array of the variances of the lagged differences
        vols = [np.std(np.subtract(xs[pathidx,lag:], xs[pathidx,:-lag])) for lag in lags]

        # Use a linear fit to estimate the Hurst Exponent
        poly = np.polyfit(np.log(lags), np.log(vols), 1)
#         poly = np.polyfit(lags, np.log(vols), 1)
        hs[pathidx] = poly[0]

    # Return the Hurst exponents from the polyfit output per path
    return hs
hursts = calc_hurst_exponent(paths_to_plot)

In [None]:
plt.figure()
plt.plot(hursts)
plt.plot(hursts.mean()*np.ones_like(hursts))
plt.show()
print(hursts.mean())

### Calculate and plot the distribution of the Hurst exponent for Browniam motion (mr speed = 0)

In [None]:
annvol = 0.01
bm_rev = 0.0 ## Brownian motion
n_bm_paths = 10000
nyears = 3
seed = 314
bm_paths = simulate_paths(npaths = n_bm_paths, ndays = nyears*250, x0 = 0.0, vol = annvol, mrspeed = bm_rev, seed=seed)
bm_hursts = calc_hurst_exponent(bm_paths)
print(f'Average of Hurst exponents over all paths: {bm_hursts.mean()}')

In [None]:

plt.figure()

# Create histogram
nbins = 40
n, bins, patches = plt.hist(bm_hursts, bins=nbins, density = True)

# Find 5th percentile
pct5 = np.percentile(bm_hursts, 5)

# Convert raw frequency to percentage
# n = n / np.sum(n) * 100

# Mark 5th percentile on plot
plt.axvline(x=pct5, color='red', linestyle='dashed', label = '5% pct')
plt.axhline(y=5, color='red', linestyle='dotted', label = '5%')
plt.text(pct5, max(n), f' 5% pct = {pct5:.2f} ', ha='right', va='top')

# Set y-axis as percentage
plt.gca().yaxis.set_major_formatter(mtick.PercentFormatter())

# add empirical CDF
plt.plot(np.sort(bm_hursts)[:-n_bm_paths//2], (np.arange(n_bm_paths)/n_bm_paths*100)[:-n_bm_paths//2], label = 'Empirical CDF')

# Add axis labels and title
plt.xlabel('Values')
plt.ylabel('Density')
plt.title('Histogram and empirical CDF of Hurst exponents per path')
plt.legend(loc = 'best')

# Show plot
plt.show()


### Investigate ADF nd the Hurst exponent methods across a range of MR speeds

In [None]:
mrspeeds = [32,16,8,4,2,1,0.5,0.25,0.125,0,-0.125,-0.25, -0.5, -1,-2,-4]
annvol = 0.01
npaths_to_use = 10000
nyears = 3
seed = 314

adf_max_lag=1
adf_regr_params='c'

adf_pvals = np.zeros_like(mrspeeds)
hs = np.zeros_like(mrspeeds)

for i,rev in enumerate(mrspeeds):
    print(rev, end=' ')
    simpaths = simulate_paths(npaths = npaths_to_use, ndays = nyears*250, x0 = 0.0, vol = annvol, mrspeed = rev, seed=seed)
    adf_pval = np.zeros(npaths_to_use)
    for n in range(npaths_to_use):
        res = adfuller(simpaths[n,:], maxlag=adf_max_lag, regression=adf_regr_params)
        adf_pval[n] = res[1]
    adf_pvals[i] = adf_pval.mean()
    hs[i] = calc_hurst_exponent(simpaths).mean()



In [None]:
plt.figure()
plt.plot(mrspeeds, adf_pvals, '.', label = 'p-value')
plt.plot(mrspeeds, 0.05*np.ones_like(mrspeeds), '-', label = '0.05')
# plt.xscale('log')
plt.xlabel('mean reversion speed')
plt.ylabel('p-value')
plt.title(f'ADF test, nyears={nyears}, annvol={annvol}')
plt.legend(loc = 'best')
# plt.yscale('log')
plt.show()

In [None]:
plt.figure()
plt.plot(mrspeeds, hs, '.', label = 'hurst exponent')
plt.plot(mrspeeds, 0.5*np.ones_like(mrspeeds), '-.', label = '0.5 for BM')
plt.plot(mrspeeds, 0.44*np.ones_like(mrspeeds), '-.', label = '0.45, empirical pval=5%')
# plt.yscale('log')
plt.xlabel('mean reversion speed')
plt.ylabel('Hurst exponent')
plt.title(f'Hurst, nyears={nyears}, annvol={annvol}')
plt.legend(loc = 'best')

plt.show()

In [None]:
plt.figure()
plt.plot(mrspeeds, hs, '.', label = 'hurst exponent')
plt.plot(mrspeeds, 0.5*np.ones_like(mrspeeds), '-', label = '0.5 for BM')
plt.xscale('log')
plt.yscale('log')
plt.xlabel('mean reversion speed')
plt.ylabel('Hurst exponent')
plt.title(f'Hurst, nyears={nyears}, annvol={annvol}')
plt.legend(loc = 'best')

plt.show()