In [None]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px

# Fractional differentiation

In [None]:
def generate_test_data(obs, seed=1):
    np.random.seed(seed)
    returns = pd.DataFrame(np.random.normal(0.002, 0.1, obs), index=pd.date_range('2015-01-02',periods=obs))
    close = returns.add(1).cumprod()
    close.loc[pd.to_datetime('2015-01-01')] = 1
    close = close.sort_index()
    close.rename(columns={0:'Close'}, inplace=True)
    return close

## Standard Fracdiff (Expanding Window)

In [None]:
def getWeights(d,size):
    # thres>0 drops insignificant weights
    w=[1.]
    for k in range(1,size):
        w_=-w[-1]/k*(d-k+1)
        w.append(w_)
    w=np.array(w[::-1]).reshape(-1,1)
    return w

import matplotlib.pyplot as mpl

def plotWeights(dRange,nPlots,size):
    w=pd.DataFrame()
    for d in np.linspace(dRange[0],dRange[1],nPlots):
        w_=getWeights(d,size=size)
        w_=pd.DataFrame(w_,index=range(w_.shape[0])[::-1],columns=[d])
        w=w.join(w_,how='outer')
    ax=w.plot()
    ax.legend(loc='upper left');mpl.show()
    return

if __name__=='__main__':
    plotWeights(dRange=[0,1],nPlots=11,size=6)
    plotWeights(dRange=[1,2],nPlots=11,size=6)

In [None]:
def fracDiff(series,d,thres=.01):
    '''
    Increasing width window, with treatment of NaNs
    Note 1: For thres=1, nothing is skipped.
    Note 2: d can be any positive fractional, not necessarily bounded [0,1].
    '''
    #1) Compute weights for the longest series
    w = getWeights(d,series.shape[0])

    #2) Determine initial calcs to be skipped based on weight-loss threshold
    w_ = np.cumsum(abs(w))
    w_ /= w_[-1]
    skip = w_[w_>thres].shape[0]

    #3) Apply weights to values
    df = {}
    for name in series.columns:
        seriesF,df_ = series[[name]].ffill().dropna(),pd.Series(dtype=float)
        for iloc in range(skip,seriesF.shape[0]):
            loc = seriesF.index[iloc]
            if not np.isfinite(series.loc[loc,name]):continue # exclude NAs
            df_[loc] = np.dot(w[-(iloc+1):,:].T,seriesF.loc[:loc])[0,0]
        df[name] = df_.copy(deep=True)
    df = pd.concat(df,axis=1)
    return df

Apply FracDiff on random price series

In [None]:
# generate random price series
close = generate_test_data(1000)

# apply fracdiff algo
fdiff = pd.DataFrame(index=close.index)

for d in np.linspace(0.1, 1,10):
    fdiff[d] = fracDiff(close,d,thres=0.01)

# plot
fig = go.Figure()
fig.add_trace(go.Scatter(x=close.index, y=close.iloc[:,0], name='Close'))

for col in fdiff.columns:
    fig.add_trace(go.Scatter(x=fdiff.index, y=fdiff[col], name=round(col, 2)))

fig.update_layout(title='Random Price Series', xaxis_title='Date', yaxis_title='Price')
fig.show()


## Fixed-width window Fracdiff

In [None]:
def getWeights_FFD(d,thres):
    w,k=[1.],1
    while True:
        w_=-w[-1]/k*(d-k+1)
        if abs(w_)<thres:break
        w.append(w_);k+=1
    return np.array(w[::-1]).reshape(-1,1)

In [None]:
def fracDiff_FFD(series,d,thres=1e-5):
    # Constant width window (new solution)
    w = getWeights_FFD(d,thres)
    width = len(w)-1
    df = {}
    for name in series.columns:
        seriesF,df_=series[[name]].ffill().dropna(),pd.Series(dtype=float)
        for iloc1 in range(width,seriesF.shape[0]):
            loc0,loc1=seriesF.index[iloc1-width],seriesF.index[iloc1]
            if not np.isfinite(series.loc[loc1,name]):continue # exclude NAs
            df_[loc1]=np.dot(w.T,seriesF.loc[loc0:loc1])[0,0]
        df[name]=df_.copy(deep=True)
    df=pd.concat(df,axis=1)
    return df

In [None]:
# generate random price series
close = generate_test_data(1000)

# apply fracdiff algo
fdiff = pd.DataFrame(index=close.index)

for d in np.linspace(0, 1,11):
    fdiff[d] = fracDiff_FFD(close,d,thres=0.01)

# plot
fig = go.Figure()
# fig.add_trace(go.Scatter(x=close.index, y=close.iloc[:,0], name='Close'))

for col in fdiff.columns:
    fig.add_trace(go.Scatter(x=fdiff.index, y=fdiff[col], name=round(col, 2)))

fig.update_layout(title='Random Price Series and its Differentiation', xaxis_title='Date', yaxis_title='Price', legend_title='fractional difference')
fig.show()

## Stationarity with maximum memory preservation


In [None]:
def plotMinFFD(series):
    from statsmodels.tsa.stattools import adfuller

    # path,instName='./','ES1_Index_Method12'

    out = pd.DataFrame(columns=['adfStat','pVal','lags','nObs','95% conf','corr'])

    # df0=pd.read_csv(path+instName+'.csv',index_col=0,parse_dates=True)
    df1=series
    df1.columns=['Close']

    for d in np.linspace(0,1,11):

        # df1=np.log(df0[['Close']]).resample('1D').last() # downcast to daily obs
        
        df2=fracDiff_FFD(df1,d,thres=.01)

        corr=np.corrcoef(df1.loc[df2.index,'Close'],df2['Close'])[0,1]

        df2=adfuller(df2['Close'],maxlag=1,regression='c',autolag=None)

        out.loc[d]=list(df2[:4])+[df2[4]['5%']]+[corr] # with critical value

    # out.to_csv(path+instName+'_testMinFFD.csv')
    # out[['adfStat','corr']].plot(secondary_y='adfStat')
    # mpl.axhline(out['95% conf'].mean(),linewidth=1,color='r',linestyle='dotted')
    # mpl.savefig(path+instName+'_testMinFFD.png')

    return out

In [None]:
adfstats = plotMinFFD(close)

In [None]:
adfstats

In [None]:
from plotly.subplots import make_subplots

fig = go.Figure()
fig = make_subplots(specs=[[{"secondary_y": True}]])

fig.add_trace(go.Scatter(x=adfstats.index, y=adfstats['corr'], name='Correlation'), secondary_y=True)
fig.add_trace(go.Scatter(x=adfstats.index, y=adfstats['adfStat'], name='ADF Stat'), secondary_y=False)

# add axhline at 95%
fig.add_shape(type="line", x0=0, y0=adfstats['95% conf'].mean(), x1=1, y1=adfstats['95% conf'].mean(), line=dict(color="Black",width=1,dash="dot"), secondary_y=False)

fig.update_layout(title='ADF Stat and Correlation', xaxis_title='d', yaxis_title='Value', width=800, height=600)
fig.show()
