In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import timedelta
import datetime
import time

#import f_drift_burst_v8

In [2]:
import numpy as np
from scipy import signal
import numbers
from inspect import isfunction
import datetime

# Drift Burst calculator

In [3]:
class DriftBurst:
    
    def __init__(self, ti : np.array = [], Xi : np.array = [], titest : np.array = [], *args, **kwargs):
        
        # To set any optional argument, when creating a DriftBurst object:
        # after entering variables ti, Xi, titest,
        # insert an ordered tuple (a,a',...) or an ordered list [a,a',...] prefixed by *
        # or insert a dictionary {a:a',...} prefixed by **
        # Set {a,b,...} structures are not compatible.
        
        self.ti = ti
        self.Xi = Xi
        self.titest = titest
        self.args = args
        self.kwargs = kwargs
        self.varargin : dict = {}
    
    def aux_hac_weight(self, n_aclag):
        w = np.arange(1, n_aclag+1)/n_aclag
        w = np.where(w <= 0.5, (1 - 6*np.square(w) + 6*np.power(w, 3)), (2 * (1 - w) ** 3)) 

        return w
    
    def aux_avar(self, wdXi, n_aclag):
        v0 = np.sum(np.square(wdXi))
        ac = np.zeros(n_aclag)
        for i in range(n_aclag):
            ac[i] = np.sum([(wdXi[j - 1] * wdXi[i + j]) for j in range(1, len(wdXi) - i)])
        w = self.aux_hac_weight(n_aclag)
        avar = v0 + 2 * np.sum(ac * w)
        return avar

    def aux_noise_lag(self, dXi, n_mu):
        c = 2.6614
        q = 2
        T = n_mu
        n = round(4 * (T / 100) ** (4 / 25))
        root = 1 / (2 * q + 1)
        E_deij = [0] * (n + 1)
        for i in range(n + 1):
            E_deij[i] = np.mean([(dXi[j - 1] * dXi[i + j]) for j in range(1, len(dXi) - i)])
        ac = np.zeros(n)
        v0 = -1 * np.sum([((j + 1) * E_deij[j]) for j in range(n + 1)])
        for i in range(n):
            ac[i] = -1 * np.sum([(j * E_deij[i + j]) for j in range(1, n + 1 - i)])
        s0 = v0 + 2 * np.sum(ac)
        sq = 2 * np.sum(np.power(np.arange(1,n+1), q) * ac)
        try:
            gamma = c * (((sq / s0) ** q) ** root)
            n_aclag = int(round(gamma * T ** root))
        except ValueError:
            gamma = 100
            n_aclag = int(round(gamma * T ** root))
        
        return n_aclag
        
    def aux_check_optional(self):
        
        optional = ['bndw_m','bndw_v','kernel','k_n','n_aclag']
                
        if type(self.args) != tuple or type(self.kwargs) != dict:
            raise TypeError('invalid input structure')
        
        if type(self.kwargs) == dict and len(self.kwargs) > 0:
            self.varargin = self.kwargs
        elif type(self.args) == tuple and len(self.args) > 0:
            varargin = [0]
            varargin[0] = self.args
            nargin = len(varargin[0])
            if nargin % 2 != 0:
                raise TypeError('wrong number of optional arguments')
            else:
                for i in range(0, len(varargin[0]) - 1, 2):
                    self.varargin[varargin[0][i]] = varargin[0][i+1]
        
        varname = list(self.varargin.keys())
        
        for idx in varname:
            idx = idx.lower()
            if idx not in optional:
                raise TypeError('optional argument "{0}" invalid'.format(idx))
        
        if 'bndw_m' not in varname:
            self.varargin['bndw_m']  = 300000
            self.bndw_m = 300000
        elif (np.isscalar(self.varargin['bndw_m']) == 0
              or isinstance(self.varargin['bndw_m'], numbers.Number) == 0 or self.varargin['bndw_m'] <= 0):
            raise ValueError('optional argument "bndw_m" should be a positive number')

        if 'bndw_v' not in varname:
            self.varargin['bndw_v']  = 1500000
            self.bndw_v = 1500000
        elif (np.isscalar(self.varargin['bndw_v']) == 0
              or isinstance(self.varargin['bndw_v'], numbers.Number) == 0 or self.varargin['bndw_v'] <= 0):
            raise ValueError('optional argument "bndw_v" should be a positive number')

        if 'k_n' not in varname:
            self.varargin['k_n']  = 1
            self.k_n = 1
        elif (np.isscalar(self.varargin['k_n']) == 0 or isinstance(self.varargin['k_n'], numbers.Number) == 0
              or self.varargin['k_n'] <= 0 or self.varargin['k_n'] % 1 != 0):
            raise ValueError('optional argument "k_n" should be a positive integer')
        
        if 'kernel' not in varname:
            self.varargin['kernel']  = lambda x : np.exp(-1 * abs(x)) * (x <= 0)
            self.kernel = lambda x : np.exp(-1 * abs(x)) * (x <= 0)
        elif isfunction(self.varargin['kernel']) == 0:
            raise ValueError('optional argument "kernel" is invalid')
            
        if 'n_aclag' not in varname:
            self.varargin['n_aclag']  = -1
            self.n_aclag = -1
        elif (np.isscalar(self.varargin['n_aclag']) == 0 or isinstance(self.varargin['n_aclag'], numbers.Number) == 0
              or self.varargin['n_aclag'] < 0 or self.varargin['n_aclag'] % 1 != 0) and self.varargin['n_aclag'] != -1:
            raise ValueError('optional argument "n_aclag" should be a positive integer')
        
        return self.varargin
    
    def f_drift_burst(self): # computes t-statistics for testing the presence of drift bursts in the process
                             # of a Brownian Motion at time/s titest
        
        self.varargin = self.aux_check_optional()
        self.bndw_m = self.varargin['bndw_m']
        self.bndw_v = self.varargin['bndw_v']
        self.kernel = self.varargin['kernel']
        self.k_n = self.varargin['k_n']
        self.n_aclag = self.varargin['n_aclag']
        
        dXi = np.diff(self.Xi)
        w = [1] * self.k_n
        b = w + list([-1 * w[i] for i in range(self.k_n)])
        pdXi = signal.lfilter(b, 1.0, self.Xi)
        o = np.zeros(2 * self.k_n - 1)
        pdXi = np.append(o[1:], pdXi[(2*self.k_n - 1) :]) if o.size else np.append([0], pdXi[(2*self.k_n - 1) :])
        
        n_mu = np.zeros(len(self.titest))
        mu_t = np.zeros(len(self.titest))
        
        for i in range(len(self.titest)):
            w = self.kernel(((self.ti[:-1] - self.titest[i])/np.timedelta64(1, 'm') / self.bndw_m))
            n_mu[i] = (np.sum(w)).astype(float)
            idx = np.zeros(len(self.ti) - 1)
            idx = (w > 0).astype(int)
            mu_t[i] = (np.sum(w * idx * pdXi) / self.bndw_m).astype(float)

        n_sigma = np.zeros(len(self.titest))
        n_lag = np.zeros(len(self.titest))
        var_t = np.zeros(len(self.titest))
        
        for i in range(len(self.titest)):
            w = self.kernel(((self.ti[:-1] - self.titest[i])/np.timedelta64(1, 'm') / self.bndw_v))
            n_sigma[i] = (np.sum(w)).astype(float)
            idx = np.zeros(len(self.ti) - 1)
            idx = (w > 0).astype(int)
            if self.n_aclag == -1:
                max_lag = 20
                idXi = dXi[idx != 0]
                q = self.aux_noise_lag(idXi, n_mu[i])
                n_lag[i] = min(q, max_lag) + 2 * (self.k_n - 1)
                self.n_aclag = -1
            else:
                n_lag[i] = self.n_aclag
                
            var_t[i] = (self.aux_avar((w * idx * pdXi), int(n_lag[i])) / self.bndw_v )

        db_t = (np.sqrt(self.bndw_m) * mu_t / np.sqrt(var_t))
        
        return db_t      

# Data import

In [4]:
data = pd.read_csv('USA500_market_open.csv')
data.drop(columns=['Unnamed: 0'], inplace=True)

In [5]:
data.head()

Unnamed: 0,Timestamp,Bid price,Ask price,Bid volume,Ask volume
0,2013-01-02 15:50:03.590,1426.19,1426.19,1.0,1.0
1,2013-01-02 15:50:16.559,1439.34,1439.34,1.0,1.0
2,2013-01-02 15:50:26.637,1440.72,1440.72,1.0,1.0
3,2013-01-02 15:50:38.210,1441.2,1441.2,1.0,1.0
4,2013-01-02 15:50:42.624,1442.46,1442.46,1.0,1.0


In [6]:
data.tail()

Unnamed: 0,Timestamp,Bid price,Ask price,Bid volume,Ask volume
54424795,2021-12-31 21:59:59.767,4766.933,4767.449,0.00225,0.00225
54424796,2021-12-31 21:59:59.831,4766.943,4767.466,0.00037,0.00037
54424797,2021-12-31 21:59:59.881,4767.954,4768.464,0.00225,0.00225
54424798,2021-12-31 21:59:59.932,4767.451,4767.949,0.00225,0.00225
54424799,2021-12-31 21:59:59.982,4766.633,4767.161,0.00225,0.00225


In [7]:
data.Timestamp = pd.to_datetime(data.Timestamp, format='%Y-%m-%d %H:%M:%S.%f')

# Test calculation

In [None]:
%%time

print(datetime.datetime.today())

chunks = [0, 0.2, 0.4, 0.6, 0.8, 1]
chunk1 = int(chunks[0]*len(data))
chunk2 = int(chunks[1]*len(data))

Signal = pd.DataFrame()
Signal['Price'] = data['Ask price'].iloc[chunk1:chunk2]
Signal['t'] = data.Timestamp.iloc[chunk1:chunk2]

# set parameters
tau = -1
sample_frequency = 100 # sample every 10th observation
start = 100 # number of minimum observations 
bndw_m = [1, 2, 5, 10, 30, 60, 300][0] # time frame (in minutes) for identifying the drift explosion
bndw_m = bndw_m*60 # *60 because data is in seconds ('t' is in seconds)

zz = {'bndw_m' : bndw_m, 'bndw_v' : 5*bndw_m} 
Rwindow = [int(0.5*5*bndw_m), int(1*5*bndw_m)] # amount of data points for the function

db = []
# rolling window 
for rwindow in Rwindow:
    
    titel = 'db_t' + '_bndw_m_' + str(bndw_m) + '_rwindow_' + str(rwindow) 
    
    # calculate the signal for each point in time/sample_frequency - runs a few minutes
    for tt in Signal.index[start:][::sample_frequency]:
           
        t = Signal.loc[:tt,'t'][-rwindow:].values
        x = Signal.loc[:tt,'Price'][-rwindow:].values
        # evaluate function
        bm = DriftBurst(t, x, [t[tau]], **zz)
        # save t-stat
        Signal.loc[tt,titel] = bm.f_drift_burst()[0]

    Signal.to_csv('USA500_{}_market_open.csv'.format(titel), mode='a', header=False)

2022-01-09 21:46:06.327194
