In [16]:
import dask
from dask.distributed import Client, progress
from dask import delayed
import dask.dataframe as dd
import dask.array as da
import pandas as pd 
import numpy as np
from datetime import datetime, timedelta, date, timezone
from scipy import signal
from dask.dataframe.utils import make_meta

In [17]:
client = Client(n_workers=8, threads_per_worker=2, memory_limit='1GB')
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 62174 instead


0,1
Client  Scheduler: tcp://127.0.0.1:62177  Dashboard: http://127.0.0.1:62174/status,Cluster  Workers: 8  Cores: 16  Memory: 7.45 GiB


In [18]:
df = dd.read_csv("ProcessedData/DOGE_2021-04-16.csv")
df["datetime"] = dd.to_datetime(df["datetime"])
df["datetimeNotTheIndex"] = dd.to_datetime(
    df["datetimeNotTheIndex"])

df = df.set_index("datetime", sorted=True)

In [19]:
print(df.columns)
df.shape

Index(['ask_price', 'bid_price', 'mark_price', 'high_price', 'low_price',
       'open_price', 'volume', 'datetimeNotTheIndex',
       'mark_price_10T_velocity', 'mark_price_30T_velocity',
       'mark_price_60T_velocity', 'mark_price_180T_velocity',
       'mark_price_720T_velocity', 'mark_price_1440T_velocity',
       'mark_price_4320T_velocity', 'mark_price_10080T_velocity',
       'mark_price_21600T_velocity', 'mark_price_10T_mean',
       'mark_price_30T_mean', 'mark_price_60T_mean', 'mark_price_180T_mean',
       'mark_price_720T_mean', 'mark_price_1440T_mean',
       'mark_price_4320T_mean', 'mark_price_10080T_mean',
       'mark_price_21600T_mean', 'mark_price_10T_std', 'mark_price_30T_std',
       'mark_price_60T_std', 'mark_price_180T_std', 'mark_price_720T_std',
       'mark_price_1440T_std', 'mark_price_4320T_std', 'mark_price_10080T_std',
       'mark_price_21600T_std', 'spread',
       'mark_price_10T_acceleration_for_10T_velocity',
       'mark_price_30T_acceleration_for

(Delayed('int-f14ed726-9190-43cf-b1aa-31ad14b46d6b'), 48)

In [20]:
training_columns = ['mark_price', 'ask_price', 'bid_price', 'spread',
                    'mark_price_10T_velocity', 'mark_price_60T_velocity',
                    'mark_price_1440T_velocity', 'mark_price_10T_mean',
                    'mark_price_60T_mean', 'mark_price_1440T_mean', 'mark_price_10T_std',
                    'mark_price_60T_std', 'mark_price_1440T_std',
                    'mark_price_10T_acceleration_for_10T_velocity',
                    'mark_price_60T_acceleration_for_60T_velocity', "datetimeNotTheIndex"]

df_train = df[training_columns]

In [21]:
# df_train_resampled = df_train.compute().resample(rule='30s', label='right', closed='right')

In [22]:
##################################
# MinMax GLOBALS
##################################
# Flags to indicate min/max columns
MIN_FLAG = 1
MAX_FLAG = 2
NEITHER_FLAG = 0
# Threshold percent for determining when to keep/delete a min or max value
# i.e. if .5 is put here, that translates to .5% in the code
THRESHOLD_PERCENTAGE = .25
SIGNAL_MAX_ORDER = 25
SIGNAL_MIN_ORDER = 25

# Gapsize for spliting data
GAP_SIZE_MINUTES = 7
RESAMPLE_PERIOD = '30S'
INTERPOLATION_METHOD = 'linear'


def splitDataByGaps(data):
    # Covert from a datetime index back to a 'standard' index for use the for loop, below. This seems to be the easier fix. Datetime index does not work for the iloc
    deltas = data.set_index(np.array(range(0, data.shape[0], 1)))[
        "datetimeNotTheIndex"].diff()[0:]
    gaps = deltas[deltas > timedelta(minutes=GAP_SIZE_MINUTES)]

    data_split = list()
    start_index = 0
    for gap_index in gaps.index:
        data_split.append(data.iloc[start_index:gap_index, :])
        start_index = gap_index
    # Append the final split
    data_split.append(data.iloc[start_index:, :])

    return data_split

def resampleAndInterpolate(data):
    resample_index = pd.date_range(
        start=data.index[0],  end=data.index[-1], freq=RESAMPLE_PERIOD)
    dummy_data = pd.DataFrame(
        np.NAN, index=resample_index, columns=data.columns)
    intermediateResample = data.combine_first(
        dummy_data).interpolate('time')
    finalResample = intermediateResample.resample(
        rule=RESAMPLE_PERIOD, origin=data.index[0]).asfreq()
    # data = data.interpolate(self.INTERPOLATION_METHOD)
    return finalResample

def generateMinmMaxColumn(pd_data):
    pd_data = generateMinmaxColumn(pd_data)
    pd_data = dd.from_pandas(minmaxThresholdCheck(pd_data.compute()), npartitions=8)
    return pd_data

def generateMinmaxColumn(pd_data):
    #######################################
    # Label data, for finding local maximum and minima.
    # New columns: Min, Max and Minmax
    #######################################
    pd_data['min'] = pd_data.mark_price[(pd_data.mark_price.shift(
        1) > pd_data.mark_price) & (pd_data.mark_price.shift(-1) > pd_data.mark_price)]
    pd_data['max'] = pd_data.mark_price[(pd_data.mark_price.shift(
        1) < pd_data.mark_price) & (pd_data.mark_price.shift(-1) < pd_data.mark_price)]

    # Min and Max column should not contain NANs
    pd_data['min'] = pd_data['min'].fillna(0)
    pd_data['max'] = pd_data['max'].fillna(0)

    # Minmax column should be 0 for neither min nor max
    pd_data['minmax'] = 0

    # Prep intermediate data objects for storing min and max
    tempSeries = pd_data["minmax"]
    tempNumpy = tempSeries.compute().to_numpy()
    tempindex = tempSeries.compute().index

    # Minmax column should be 1 for min
    minData = signal.argrelmin(data=pd_data["mark_price"].values.compute(), order=SIGNAL_MAX_ORDER, mode='clip')
    
    # Minmax column should be 2 for max
    maxData = signal.argrelmax(data=pd_data["mark_price"].values.compute(), order=SIGNAL_MIN_ORDER, mode='clip')
    

    tempNumpy[minData] = MIN_FLAG
    tempNumpy[maxData] = MAX_FLAG
    newTempSeries = pd.Series(tempNumpy)
    newTempSeries.index = tempindex
    pd_data["minmax"] = newTempSeries

    return pd_data

# @jit(nopython=True, parallel=True)
def minmaxThresholdCheck(pd_data):
    # grab just the rows with non-zero min and max values for easy comparison
    subset = pd_data.loc[(pd_data['minmax'] != NEITHER_FLAG)]

    # For each row in subset, check if each subsequent pair of of min/max values pass the given threshold
    subset_row_counter = 0
    subsetSize = subset.shape[0]
    while(subset_row_counter < subsetSize):
        diff_greater_than_threshold = False
        # Because Subset rows are removed in this loop, need to recheck the size here as an ending condition.
        # Note that subset_row_counter+1 is needed because we compare the current subset row against the next subset row
        while (not diff_greater_than_threshold) and (subset_row_counter+1 < subsetSize):
            current_minmax = subset.iloc[subset_row_counter]
            current_minmax_value = float(current_minmax['mark_price'])
            current_minmax_index = pd_data[pd_data['datetimeNotTheIndex']
                                            == current_minmax['datetimeNotTheIndex']].index[0]
            next_minmax = subset.iloc[subset_row_counter+1]
            next_minmax_value = float(next_minmax['mark_price'])
            next_minmax_index = pd_data[pd_data['datetimeNotTheIndex']
                                        == next_minmax['datetimeNotTheIndex']].index[0]

            # If the current value and next value in the subset are different (i.e. if one is a MIN and the other is a MAX), then run threshold check
            if(current_minmax['minmax'] != next_minmax['minmax']):
                percentage_change = 100 * \
                    float(abs(next_minmax_value - current_minmax_value) /
                            current_minmax_value)

                # If threshold fails (i.e. if the difference between the mark_price of the current and next minmax values is less than the given threshold),
                # then delete the next minmax
                if(percentage_change < THRESHOLD_PERCENTAGE):
                    # Remove the next min/max value from being a min/max value, since the mark_price difference from the current index is not large enough
                    pd_data.at[next_minmax_index,
                                "minmax"] = NEITHER_FLAG
                    subset = subset.drop(index=next_minmax_index)

                # Otherwise, exit the loop! Threshold passed
                else:
                    diff_greater_than_threshold = True

            # If the current value and next value in the subset are both MIN, then keep the lowest MIN value
            elif(current_minmax['minmax'] == 1 and next_minmax['minmax'] == 1):
                if(current_minmax_value <= next_minmax_value):
                    pd_data.at[next_minmax_index,
                                "minmax"] = NEITHER_FLAG
                    subset = subset.drop(index=next_minmax_index)
                else:
                    # The current minmax is getting deleted for use the the next_minmax row will replace it
                    current_minmax = next_minmax
                    pd_data.at[current_minmax_index,
                                "minmax"] = NEITHER_FLAG
                    subset = subset.drop(
                        index=current_minmax_index)
            # If the current value and next value in the subset are both MAX, then keep the highest MAX value
            elif(current_minmax['minmax'] == 2 and next_minmax['minmax'] == 2):
                if(current_minmax_value >= next_minmax_value):
                    pd_data.at[next_minmax_index,
                                "minmax"] = NEITHER_FLAG
                    subset = subset.drop(index=next_minmax_index)
                else:
                    # The current minmax is getting deleted for use the the next_minmax row will replace it
                    current_minmax = next_minmax
                    pd_data.at[current_minmax_index,
                                "minmax"] = NEITHER_FLAG
                    subset = subset.drop(
                        index=current_minmax_index)
            else:
                print(
                    "ERROR!!! SHOULD NEVER SEE THIS! Logic failed in minmaxThresholdCheck")

            # reset subSetSize
            subsetSize = subset.shape[0]

        subset_row_counter = subset_row_counter + 1

    return pd_data

In [23]:
# Upsample data to every 30 seconds (exactly) using linear interpolation and Undersample the entire     on those 30

data = df_train.compute()
# Gapsize for spliting data
GAP_SIZE_MINUTES = 7
RESAMPLE_PERIOD = '30S'
# INTERPOLATION_METHOD = 'linear'


data_split = splitDataByGaps(data)

In [24]:
data_resampled = pd.DataFrame(columns=training_columns)
for data in data_split:
    dataReducedCol = data.drop("datetimeNotTheIndex", axis=1)
    # dataReducedCol = data[training_columns]
    dataResamples = resampleAndInterpolate(dataReducedCol)
    data_resampled = pd.concat([data_resampled, dataResamples])

In [25]:
# Regenerate the "datetimeNotTheIndex" column for use in the minmax calculations
data_resampled["datetimeNotTheIndex"] = data_resampled.index
print(type(data_resampled.index))
data_resampled_dask = dd.from_pandas(data_resampled, npartitions=8)
data_resampled_dask = generateMinmMaxColumn(data_resampled_dask)

<class 'pandas.core.indexes.datetimes.DatetimeIndex'>


In [26]:
print(type(data_resampled_dask))
data_resampled_dask.compute()

<class 'dask.dataframe.core.DataFrame'>


Unnamed: 0,mark_price,ask_price,bid_price,spread,mark_price_10T_velocity,mark_price_60T_velocity,mark_price_1440T_velocity,mark_price_10T_mean,mark_price_60T_mean,mark_price_1440T_mean,mark_price_10T_std,mark_price_60T_std,mark_price_1440T_std,mark_price_10T_acceleration_for_10T_velocity,mark_price_60T_acceleration_for_60T_velocity,datetimeNotTheIndex,min,max,minmax
2020-09-23 19:42:02.537740-05:00,0.002570,0.002572,0.002569,0.000003,,,,,,,,,,,,2020-09-23 19:42:02.537740-05:00,0.000000,0.000000,0
2020-09-23 19:42:32.537740-05:00,0.002570,0.002573,0.002568,0.000005,,,,,,,,,,,,2020-09-23 19:42:32.537740-05:00,0.000000,0.000000,0
2020-09-23 19:43:02.537740-05:00,0.002572,0.002575,0.002570,0.000005,,,,,,,,,,,,2020-09-23 19:43:02.537740-05:00,0.000000,0.002572,2
2020-09-23 19:43:32.537740-05:00,0.002570,0.002573,0.002568,0.000005,,,,,,,,,,,,2020-09-23 19:43:32.537740-05:00,0.002570,0.000000,0
2020-09-23 19:44:02.537740-05:00,0.002571,0.002574,0.002569,0.000005,,,,,,,,,,,,2020-09-23 19:44:02.537740-05:00,0.000000,0.002571,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-04-15 23:57:17.635795-05:00,0.230443,0.230433,0.230454,-0.000021,-0.001458,-0.000627,0.000063,0.237017,0.251975,0.164702,0.004595,0.009176,0.046634,-0.000116,-6.183672e-06,2021-04-15 23:57:17.635795-05:00,0.230443,0.000000,0
2021-04-15 23:57:47.635795-05:00,0.231044,0.231320,0.230768,0.000552,-0.001297,-0.000618,0.000065,0.236312,0.251662,0.164735,0.004441,0.009259,0.046648,-0.000095,-8.345784e-06,2021-04-15 23:57:47.635795-05:00,0.000000,0.231044,0
2021-04-15 23:58:17.635795-05:00,0.229676,0.229751,0.229601,0.000150,-0.001436,-0.000620,0.000065,0.235619,0.251344,0.164769,0.004266,0.009340,0.046662,-0.000094,-4.658073e-06,2021-04-15 23:58:17.635795-05:00,0.000000,0.000000,0
2021-04-15 23:58:47.635795-05:00,0.228762,0.228980,0.228545,0.000435,-0.001410,-0.000616,0.000063,0.234864,0.251032,0.164802,0.004080,0.009465,0.046674,-0.000070,-1.646112e-06,2021-04-15 23:58:47.635795-05:00,0.228762,0.000000,1


In [27]:
tempDask = data_resampled_dask.compute()
print(type(tempDask))
tempDask[tempDask['minmax']==2]

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,mark_price,ask_price,bid_price,spread,mark_price_10T_velocity,mark_price_60T_velocity,mark_price_1440T_velocity,mark_price_10T_mean,mark_price_60T_mean,mark_price_1440T_mean,mark_price_10T_std,mark_price_60T_std,mark_price_1440T_std,mark_price_10T_acceleration_for_10T_velocity,mark_price_60T_acceleration_for_60T_velocity,datetimeNotTheIndex,min,max,minmax
2020-09-23 19:43:02.537740-05:00,0.002572,0.002575,0.002570,0.000005,,,,,,,,,,,,2020-09-23 19:43:02.537740-05:00,0.0,0.002572,2
2020-09-23 21:31:32.537740-05:00,0.002576,0.002579,0.002573,0.000006,7.977162e-07,9.974633e-08,4.150797e-09,0.002571,0.002565,0.002564,0.000002,0.000004,0.000004,5.984780e-08,1.664559e-09,2020-09-23 21:31:32.537740-05:00,0.0,0.002576,2
2020-09-23 23:00:32.537740-05:00,0.002596,0.002606,0.002586,0.000020,1.099419e-06,2.999247e-07,1.805246e-08,0.002587,0.002584,0.002571,0.000003,0.000005,0.000010,1.699129e-07,-1.292308e-12,2020-09-23 23:00:32.537740-05:00,0.0,0.002596,2
2020-09-23 23:29:02.537740-05:00,0.002596,0.002598,0.002594,0.000004,3.908165e-07,2.158989e-07,1.802363e-08,0.002594,0.002590,0.002574,0.000001,0.000003,0.000011,3.816299e-08,2.649561e-10,2020-09-23 23:29:02.537740-05:00,0.0,0.002596,2
2020-09-24 00:32:32.537740-05:00,0.002582,0.002584,0.002580,0.000004,6.970615e-07,-1.333305e-07,8.323175e-09,0.002578,0.002577,0.002575,0.000002,0.000004,0.000011,3.941154e-08,-2.777734e-09,2020-09-24 00:32:32.537740-05:00,0.0,0.002582,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-04-15 17:47:33.508343-05:00,0.180449,0.180631,0.180267,0.000364,8.831803e-05,-8.536035e-05,3.923753e-05,0.175705,0.181059,0.140299,0.003242,0.003132,0.018796,1.636793e-05,-1.781893e-06,2021-04-15 17:47:33.508343-05:00,0.0,0.180449,2
2021-04-15 20:15:03.508343-05:00,0.271388,0.272316,0.270460,0.001856,8.466864e-04,1.387851e-03,9.960702e-05,0.266764,0.225620,0.147587,0.003749,0.026889,0.026517,-2.017956e-04,1.891752e-05,2021-04-15 20:15:03.508343-05:00,0.0,0.271388,2
2021-04-15 21:34:17.635795-05:00,0.332657,0.333529,0.331785,0.001745,,6.201288e-04,1.424123e-04,,0.276945,0.150535,,0.015935,0.030245,,4.055502e-06,2021-04-15 21:34:17.635795-05:00,0.0,0.332657,2
2021-04-15 22:31:47.635795-05:00,0.285089,0.285328,0.284850,0.000479,1.879581e-03,-8.132364e-04,1.095079e-04,0.272391,0.293281,0.156889,0.005718,0.021619,0.040474,3.217482e-04,-1.683123e-05,2021-04-15 22:31:47.635795-05:00,0.0,0.285089,2


In [28]:
pd_to_save = data_resampled_dask.compute()
pd_to_save.index.name = "datetime"
pd_to_save.to_csv("DataFromDeepLearningProcessing/DOGE_Deep_2021-04-16.csv")
# data_resampled_dask.to_csv("DOGE_Deep_2021-04-16.csv")