# Cryptolytic Data Processing

This notebook contains the code to generate the data that is used to create the arbitrage models in this [notebook]() (add notebook link).

#### Background on arbitrage models
Arbitrage models were created with the goal of predicting arbitrage 10 min before it happens in an active crypto market. The models are generated by getting all of the combinations of 2 exchanges that support the same trading pair, engineering technical analysis features, merging that data on 'closing_time', engineering more features, and creating a target that signals an arbitrage opportunity. Arbitrage signals predicted by the model have a direction indicating which direction the arbitrage occurs in. A valid arbitrage signal is when the arbitrage lasts >30 mins because it takes time to move coins from one exchange to the other in order to successfully complete the arbitrage trades.

The models predict whether there will be an arbitrage opportunity that starts 10 mins after the prediction time and lasts for at least 30 mins, giving a user enough times to execute trades.

#### Where does the data come from?
Data for this project was obtained through the APIs of each exchange.

<img src="assets/exchange_logos.png"
     alt=" "
     align= "center"
     style="width: 1000px;" />
     
- [Bitfinex API OHLCV Data Documentation](https://docs.bitfinex.com/reference#rest-public-candles)
- [Coinbase Pro API OHLCV Data Documentation](https://docs.pro.coinbase.com/?r=1#get-historic-rates)
- [HitBTC OHLCV Data Documentation](https://api.hitbtc.com/#candles)
- [Kraken OHLCV Data Documentation](https://www.kraken.com/features/api)
- [Gemini OHLCV Data Documentation](https://docs.gemini.com/rest-api/)

The functions to collect this data can be found [here](https://github.com/Cryptolytic-app/cryptolytic/tree/master/data_collection_and_databasing)


#### What does the data look like?
<img src="assets/sample_df.png"
     alt=" "
     align= "center"
     style="width: 400px;" />
     
#### Data Dictionary
- **closing_time:** the closing time of the candlestick
- **open:** the price of the cryptocurrency at the opening of the candlestick
- **high:** the highest price of the cryptocurrency during that candlestick
- **low:** the lowest price of the cryptocurrency during that candlestick
- **close:** the price of the cryptocurrency at the end of the candlestick
- **volume:** the volume traded during that candlestick


#### Features Engineered
Technical analysis features were engineered with the [Technical Analysis Library](https://github.com/bukosabino/ta). They fall into five categories:
- Momentum indicators
- Volume indicators
- Volatility indicators
- Trend indicators
- Others indicators

#### Merging Datasets
Arbitrage could occur between two exchanges that have the same trading pair. We genererated all of the possible arbitrage combinations between the 80 datasets that were available. This resulted in 95 possible combinations of arbitrage datasets that could be used in modeling.

For each possible arbitrage combination, we merged the two datasets on 'closing_time' and created new features that identified arbitrage opportunities. These features included:
- **higher_closing_price:** identifies the exchange that has the higher closing price (1 or 2)
- **pct_higher:** the percentage by which higher_closing_price is greater
- **arbitrage_opportunity:** identfies if there is greater than 0.55% gain (to account for fees)
    - 1: arbitrage from exchange 1 to exchange 2
    - 0: no arbitrage
    - -1: arbitrage from exchange 2 to exchnage 1
- **close_exchange_1_shift:** shifts the exchange 1 close price to account for a 30 min trading interval + 10 min advance prediction
- **close_exchange_2_shift:** shifts the exchange 2 close price to account for a 30 min trading interval + 10 min advance prediction
- **window_length:** gets the window length of the arbitrage opportunity
- **window_length_shift:** shifts the window length by the 30 min trading interval - 10 mins
- **arbitrage_opportunity_shift:** shifts the arbitrage opportunity the the 30 min trading interval - 10 mins
- **target:** identifies whether there was an arbitrage opportunity
    - 1: arbitrage from exchange 1 to exchange 2
    - 0: no arbitrage
    - -1: arbitrage from exchange 2 to exchnage 1

Notes:
- It is HIGHLY reccommended to use AWS Sagemaker to do feature engineering and split the work onto four notebooks. This will cut down the time to process data drastically. You can do this by list slicing the filepaths of the datasets you are inputting into the functions (for example: `create_ta_csvs(csv_filepaths[:20]`)
- Since feature engineering takes a long time, we export data as csvs along each step to not have to re-engineer features in case the runtime restarts: export after technical analysis features are added, and export after datasets are merged and more arbitrage features are added.

##### Folder organization:

```
├── cryptolytic/                        <-- The top-level directory for all arbitrage work
│   ├── modeling_nb/                    <-- Directory for modeling work
│   │      ├──data/                     <-- Directory with subdirectories containing 5 min candle data
│   │      │   ├─ arb_data/             <-- Directory for csv files of arbitrage model training data
│   │      │   │   └── *.csv
│   │      │   │
│   │      │   ├─ csv_data/             <-- Directory for csv files after combining datasets and FE pt.2
│   │      │   │   └── *.csv
│   │      │   │
│   │      │   ├─ ta_data/              <-- Directory for csv files after FE pt.1 
│   │      │   │   └── *.csv
│   │      │   │
│   │      │   ├─ *.zip                 <-- ZIP files of all of the data
│   │      │   
│   │      ├──final_models/             <-- Directory for final models after model selection
│   │      │      └── *.pkl
│   │      │
│   │      ├──model_perf/               <-- Directory for performance csvs after training models
│   │      │      └── *.json
│   │      │
│   │      ├──models/                   <-- Directory for all pickle models
│   │      │      └── *.pkl
│   │      │
│   │      ├─arbitrage_data_processing.ipynb      <-- Notebook for data processing and creating csvs
│   │      │
│   │      ├─arbitrage_modeling.ipynb             <-- Notebook for baseline models and hyperparam tuning
│   │      │
│   │      ├─arbitrage_model_selection.ipynb      <-- Notebook for model selection
│   │      │
│   │      ├─arbitrage_model_evaluation.ipynb     <-- Notebook for final model evaluation
│   │      │
│   │      ├─environment.yml                      <-- yml file to create conda environment
│   │      │
│   │      ├─trade_recommender_models.ipynb       <-- Notebook for trade recommender models

```

# Imports

This project uses conda to manage environments.

In [None]:
# to update your conda env from a yml file from terminal
# conda env update --file cryptolytic/modeling_nb/environment.yml

# to export yml from terminal
# conda env export > cryptolytic/finalized_notebooks/environment.yml

In [88]:
import glob
import os
import pickle
import json
import itertools
from zipfile import ZipFile
import warnings

warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import datetime as dt
pd.set_option('display.max_rows', 100000)

from ta import add_all_ta_features

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

# Data

In this section we'll be unzipping the files that contain the 5 minute candle data, saving them to a new directory `csv_data/`, and renaming the filepaths that contain coinbase_pro to cbpro (the underscore causes problems with naming)

#### Unzipping

In [92]:
zip_filepaths = glob.glob('data/*300.zip')
len(zip_filepaths) #5

5

In [None]:
# UNCOMMENT THIS TO UNZIP

# for zip_filepath in zip_filepaths:
#     with ZipFile(zip_filepath, 'r') as zip_ref:
#         zip_ref.extractall('data/csv_data')

#### Renaming

In [93]:
csv_filepaths = glob.glob('data/csv_data/*.csv')
len(csv_filepaths) #80

80

In [None]:
# UNCOMMENT THIS TO RENAME

# for filepath in csv_filepaths:
#     new_filepath = filepath.replace('coinbase_pro', 'cbpro')
#     os.rename(filepath, new_filepath)

#### Raw datasets look like this

In [83]:
pd.read_csv(csv_filepaths[1], index_col=0).head(10)

Unnamed: 0,closing_time,open,high,low,close,base_volume
0,1496358900,1.0,1.0,1.0,1.0,0.1
1,1496370000,1.03,1.03,1.03,1.03,0.002
2,1496377500,1.02,1.02,1.02,1.02,0.53
3,1496393700,249.99,249.99,249.99,249.99,0.001
4,1496407500,1.05,1.05,1.05,1.05,0.01
5,1496408700,1.99,1.99,1.99,1.99,0.003
6,1496422200,239.26,239.26,233.89,233.89,0.036
7,1496422500,232.9,232.95,232.55,232.55,0.056
8,1496423100,234.04,234.04,234.04,234.04,0.321
9,1496423400,230.34,233.26,230.34,232.74,0.749


#### And we want to transform it into something that looks like this for modeling...

In [None]:
# pd.read_csv('bitfinex_cbpro_btc_usd.csv')[:1000]

# Functions

#### Get all combinations of exchanges with the same trading pair

In [96]:
# five supported exchanges
exchanges = ['bitfinex', 'coinbase_pro', 'gemini', 'hitbtc', 'kraken']

# function to create pairs for arbitrage datasets
def get_file_pairs(filenames, exchanges):
    """
    This function takes in a list of exchanges and looks through data
    directories to find all possible combinations for 2 exchanges
    with the same trading pair. Returns a list of all lists that
    include the file pairs
    """
    # get combinations
    combos = list(itertools.combinations(filenames, 2))
    
    # remove unmatched trading pairs
    filtered_combos = []
    for combo in combos:
        tp1 = '_'.join(combo[0].split('/')[2].split('_')[1:3])
        tp2 = '_'.join(combo[1].split('/')[2].split('_')[1:3])
        if tp1 == tp2:
            filtered_combos.append(combo)
                    
    return filtered_combos

In [97]:
pairs = get_file_pairs(csv_filepaths, exchanges)
print(len(pairs)) # 95

95


#### OHLCV Data Resampling

In [98]:
def resample_ohlcv(df, period):
    """ 
    Changes the time period on cryptocurrency ohlcv data.
    Period is a string denoted by '{time_in_minutes}T'
    (ex: '1T', '5T', '60T').
    """

    # set date as index
    # needed for the function to run
    df = df.set_index(['date'])

    # aggregation function
    ohlc_dict = {
        'open':'first',                                                                                                    
        'high':'max',                                                                                                       
        'low':'min',                                                                                                        
        'close': 'last',                                                                                                    
        'base_volume': 'sum'
    }

    # resample
    df = df.resample(period, how=ohlc_dict, closed='left', label='left')
    
    return df

#### Filling NaNs

Resample_ohlcv function will create NaNs in df where there were gaps in the data. The gaps could be caused by exchanges being down, errors from cryptowatch or errors from the exchanges themselves.

In [None]:
def fill_nan(df):
    """
    Iterates through a dataframe and fills NaNs with appropriate 
    open, high, low, close values.
    """
    # Forward fill close column
    df['close'] = df['close'].ffill()

    # Backward fill the open, high, low rows with the close value
    df = df.bfill(axis=1)

    return df

#### Feature engineering - before merge

In [None]:
def engineer_features(df, period='5T'):
    """
    Takes a df, engineers ta features, and returns a df
    with period=['5T']
    """
    # convert unix closing_time to datetime
    df['date'] = pd.to_datetime(df['closing_time'], unit='s')
    
    # time resampling to fill gaps in data
    df = resample_ohlcv(df, period)
    
    # move date off the index
    df = df.reset_index()
    
    # create closing_time
    closing_time = df.date.values
    df.drop(columns='date', inplace=True)
    
    # create feature to indicate where rows were gaps in data
    df['nan_ohlcv'] = df['close'].apply(lambda x: 1 if pd.isnull(x) else 0)
    
    # fill gaps in data
    df = fill_nan(df)

    # adding all the technical analysis features...
    df = add_all_ta_features(df, 'open', 'high', 'low', 'close','base_volume', fillna=True)
    
    # add closing time column
    df['closing_time'] = closing_time
    
    return df

#### Feature Engineering - after merge

In [None]:
def get_higher_closing_price(df):
    """
    Returns the exchange with the higher closing price
    """
    
    # exchange 1 has higher closing price
    if (df['close_exchange_1'] - df['close_exchange_2']) > 0:
        return 1
    
    # exchange 2 has higher closing price
    elif (df['close_exchange_1'] - df['close_exchange_2']) < 0:
        return 2
    
    # closing prices are equivalent
    else:
        return 0

def get_pct_higher(df):
    """
    Returns the percentage of the difference between ex1/ex2 
    closing prices
    """
    
    # if exchange 1 has a higher closing price than exchange 2
    if df['higher_closing_price'] == 1:
        
        # % difference
        return ((df['close_exchange_1'] / 
                 df['close_exchange_2'])-1)*100
    
    # if exchange 2 has a higher closing price than exchange 1
    elif df['higher_closing_price'] == 2:
        
        # % difference
        return ((df['close_exchange_2'] / 
                 df['close_exchange_1'])-1)*100
    
    # if closing prices are equivalent
    else:
        return 0

def get_arbitrage_opportunity(df):
    """
    Return available arbitrage opportunities
    
    1: arbitrage from exchange 1 to exchange 2
    0: no arbitrage
    -1: arbitrage from exchange 2 to exchange 1
    """
    
    # assuming the total fees are 0.55%, if the higher closing price 
    # is less than 0.55% higher than the lower closing price
    if df['pct_higher'] < .55:
        return 0 # no arbitrage
    
    # if exchange 1 closing price is more than 0.55% higher
    # than the exchange 2 closing price
    elif df['higher_closing_price'] == 1:
        return -1 # arbitrage from exchange 2 to exchange 1
    
    # if exchange 2 closing price is more than 0.55% higher
    # than the exchange 1 closing price
    elif df['higher_closing_price'] == 2:
        return 1 # arbitrage from exchange 1 to exchange 2

def get_window_length(df):
    """
    Creates a column 'window_length' to show how long an arbitrage 
    opportunity has lasted
    """
    
    # convert arbitrage_opportunity column to a list
    target_list = df['arbitrage_opportunity'].to_list()
    
    # set initial window length 
    window_length = 5 # time in minutes
    
    # list for window_lengths
    window_lengths = []
    
    # iterate through arbitrage_opportunity column
    for i in range(len(target_list)):
        
        # check if a value in the arbitrage_opportunity column is 
        # equal to the previous value in the arbitrage_opportunity 
        # column and increase window length
        if target_list[i] == target_list[i-1]:
            window_length += 5
            window_lengths.append(window_length)
            
        # if a value in the arbitrage_opportunity column is
        # not equal to the previous value in the arbitrage_opportunity 
        # column reset the window length to five minutes
        else:
            window_length = 5
            window_lengths.append(window_length)
            
    # create window length column showing how long an arbitrage 
    # opportunity has lasted
    df['window_length'] = window_lengths

    return df
        

def merge_dfs(df1, df2):
    """
    Merges two dataframes and adds final features for arbitrage data
    
    Returns a dataframe with new features:
    - year
    - month
    - day
    - higher_closing_price
    - pct_higher
    - arbitrage_opportunity
    - window_length
    """
    # merge
    df = pd.merge(df1, df2, on='closing_time',
                  suffixes=('_exchange_1', '_exchange_2'))

    # convert closing_time to datetime
    df['closing_time'] = pd.to_datetime(df['closing_time']) 

    # Create additional date features.
    df['year'] = df['closing_time'].dt.year
    df['month'] = df['closing_time'].dt.month
    df['day'] = df['closing_time'].dt.day
    print('before get higher closing price', df.shape)
    
    # get higher_closing_price feature to create pct_higher feature
    df['higher_closing_price'] = df.apply(get_higher_closing_price, axis=1)
    
    # get pct_higher feature to create arbitrage_opportunity feature
    df['pct_higher'] = df.apply(get_pct_higher, axis=1)
    
    # create arbitrage_opportunity feature
    df['arbitrage_opportunity'] = df.apply(get_arbitrage_opportunity, axis=1)
    
    # create window_length feature
    df = get_window_length(df)
    
    return df

#### Creating the target

In [None]:
# specifying arbitrage window length to target, in minutes
interval = 30

def get_target_value(df, interval=30):
    """
    Checks for arbitrage opportunities and returns a target. 
    
    1: arbitrage from exchange 1 to exchange 2
    0: no arbitrage
    -1: arbitrage from exchange 2 to exchange 1
    """
    
    # if the arbitrage window is as long as the targeted interval
    if df['window_length_shift'] >= interval:
        # if that window is for exchange 1 to 2
        if df['arbitrage_opportunity_shift'] == 1:
            return 1 # arbitrage from exchange 1 to 2
        
        # if that window is for exchange 2 to 1
        elif df['arbitrage_opportunity_shift'] == -1:
            return -1 # arbitrage from exchange 2 to 1
        
        # if no arbitrage opportunity
        elif df['arbitrage_opportunity_shift'] == 0:
            return 0 # no arbitrage opportunity
        
    # if the arbitrage window is less than our targeted interval
    else:
        return 0 # no arbitrage opportunity
    

def get_target(df, interval=interval):
    """
    Create new features and target.
    
    Returns a dataframe with new features:
    - arbitrage_opportunity_shift
    - window_length_shift
    - target
    """
    
    # used to shift rows
    # assumes candle length is five minutes, interval is 30 mins
    rows_to_shift = int(-1*(interval/5)) # -7
    
    # arbitrage_opportunity feature, shifted by length of targeted interval
    # minus one to predict ten minutes in advance rather than five
    df['arbitrage_opportunity_shift'] = df['arbitrage_opportunity'].shift(
        rows_to_shift - 1)
    
    # window_length feature, shifted by length of targeted interval minus one
    # to predict ten minutes
    df['window_length_shift'] = df['window_length'].shift(rows_to_shift - 1)
    
    # creating target column; this will indicate if an arbitrage opportunity
    # that lasts as long as the targeted interval is forthcoming
    df['target'] = df.apply(get_target_value, axis=1)
    
    # dropping rows where target could not be calculated due to shift
    df = df[:rows_to_shift - 1] # -7
    
    return df

def get_close_shift(df, interval=interval):
    """
    Shifts the closing prices by the selected interval +
    10 mins.
    
    Returns a df with new features:
    - close_exchange_1_shift
    - close_exchange_2_shift
    """
    
    rows_to_shift = int(-1*(interval/5))
    
    df['close_exchange_1_shift'] = df['close_exchange_1'].shift(
        rows_to_shift - 2)
    
    df['close_exchange_2_shift'] = df['close_exchange_2'].shift(
        rows_to_shift - 2)
    
    return df

def get_profit(df):
    """
    Calculates the profit of an arbitrage trade.
    
    Returns df with new profit feature.
    """
    
    # if exchange 1 has the higher closing price
    if df['higher_closing_price'] == 1:
        
        # return how much money you would make if you bought 
        # on exchange 2, sold on exchange 1, and took account 
        # of 0.55% fees
        return (((df['close_exchange_1_shift'] / 
                 df['close_exchange_2'])-1)*100)-.55
    
    # if exchange 2 has the higher closing price
    elif df['higher_closing_price'] == 2:
        
        # return how much money you would make if you bought 
        # on exchange 1, sold on exchange 2, and took account 
        # of 0.55% fees
        return (((df['close_exchange_2_shift'] / 
                 df['close_exchange_1'])-1)*100)-.55
    
    # if the closing prices are the same
    else:
        return 0 # no arbitrage

#### Split names when in the format exchange_trading_pair

In [None]:
def get_exchange_trading_pair(ex_tp):
    """
    Splits exchange_trading_pair into separate variables
    """

    ex_tp = ex_tp.split('/')[2]
    exchange = ex_tp.split('_')[0]
    trading_pair = '_'.join(ex_tp.split('_')[1:3])
        
    return exchange, trading_pair

### Generate all individual csv's with ta data (~1-2 hours)

Notes:
- create a `/ta_data` directory before running this function
- this function takes a really long time to run so it's recommended to run in sagemaker and divide the pairs in to 4 notebooks so you're running about 20 pairs in each notebook. Should take ~1 hour if split up on 4 notebooks.

In [None]:
def create_ta_csvs(csv_filepaths):
    """
    Takes a csv filename, creates a dataframe, engineers 
    features, and saves it as a new csv in /ta_data.
    """
    
    # counter
    n = 1
    
    for file in csv_filepaths:
        
        # create df
        df = pd.read_csv(file, index_col=0)
        
        # define period
        period = '5T'
        
        # engineer features
        df = engineer_features(df, period)
        print('features engineered')
        
        # generate new filename
        filename = 'data/ta_data/' + file.split('/')[2][:-4] + '_ta.csv'
        
        # export csv
        df.to_csv(filename)
        
        # print progress
        print(f'csv #{n} saved :)')
        
        # update counter
        n += 1

In [None]:
create_ta_csvs(csv_filepaths)

### Generate all arbitrage training data csv's (~9 hrs)

Notes:
- create a `/arb_data` directory before running this function
- this function takes a really long time to run so it's recommended to run in sagemaker and divide the pairs in to 4 notebooks so you're running about 20 pairs in each notebook. Should take ~2-3 hours if split up on 4 notebooks.

In [None]:
def create_arb_csvs(pairs):
    """Takes a list of possible arbitrage combinations, finds the 
        appropriate datasets in /ta_data, loads datasets, merges them,
        engineers more features, creates a target and exports the new
        dataset as a csv"""
    
    # counter
    n = 0
    
    # iterate through arbitrage combinations
    for pair in pairs:
        
        # define paths for the csv
        csv_1 = 'data/ta_data/' + pair[0].split('/')[2][:-4] + '_ta.csv'
        csv_2 = 'data/ta_data/' + pair[1].split('/')[2][:-4] + '_ta.csv'
        print('csv1, csv2:', csv_1, csv_2)
        
        # define exchanges and trading_pairs
        ex_tp_1, ex_tp_2 = pair[0][:-8], pair[1][:-8]
        print(ex_tp_1)
        ex1, tp1 = get_exchange_trading_pair(ex_tp_1)
        ex2, tp2 = get_exchange_trading_pair(ex_tp_2)
        print(ex1, tp1,  ex2, tp2)
        
        # define model_name for the filename
        model_name = ex1 + '_' + ex_tp_2.split('/')[2]
        print(model_name)
          
        # create dfs from csv's that already include ta features
        df1, df2 = pd.read_csv(csv_1, index_col=0), pd.read_csv(csv_2, index_col=0)       
        print('df 1 shape: ', df1.shape, 'df 2 shape: ', df2.shape)

        # merge dfs
        df = merge_dfs(df1, df2)
        print('dfs merged')
        print('merged df shape:' , df.shape)
        

        # create target 
        df = get_target(df)
        print(model_name, ' ', df.shape)

        # export csv
        path = 'data/arb_data/'
        csv_filename = path + model_name + '.csv'
        df.to_csv(csv_filename)

        # print progress
        print(f'csv #{n} saved :)')

        # update counter
        n += 1


In [None]:
create_arb_csvs(pairs)

#### Check to make sure all csv's saved

In [99]:
arb_data_paths = glob.glob('arb_data/*.csv')
print(len(arb_data_paths)) # 95

0
