In [None]:
# ! git clone https://github.com/badcoder-cloud/SatoshiVault

In [2]:
import json
from datetime import datetime
import time
import numpy as np
import pandas as pd
import math
import sys
import os
import pdb


# sys.path.append(os.path.abspath("/content/SatoshiVault/code")) # Colab

from utilis import *

# price = get_current_price("BTCUSDT")
# price = float(price['price'])
# Redo the current price for aggregator


In [57]:
class sProcessBooks():
    """
        Important notes: 
            Keep current price and current timestamp consistent among all of the sProcessors
            If the book is above price_level_ceiling from the current price, it will be deleted for computational efficiency.
            It would be wise to assume that over 60 secods, very wide books are unimportant 
        
        Descrtiption: 
            Processes second streams of limit orders and market orders
            bucket_range : price range of buckets to aggregate books
            n_buckets : number of buckets to create into single direction
            price_level_ceiling : % ceiling of price levels to ommit, default 5%
    """
    def __init__(self, exchange, symbol, start_price, level_range, price_level_ceiling=5):
        # Identification
        self.exchange = exchange
        self.symbol = symbol
        self.level_range = level_range
        self.price_level_ceiling = price_level_ceiling
        self.level_ranges = get_level_ranges(start_price, level_range, price_level_ceiling)
        self.B = {"timestamp": 1, "current_price": float(start_price), "bids" : {}, "asks" : {}}
        # Raw data processors
        self.dfs_books = create_data_frame('sec', self.level_ranges)
        self.snapshot = None
        self.previous_second = 0
        self.current_second = 1

    
    def update_current_price(self, price):
        self.B['current_price'] = price
    
    def update_books(self, total_books, bids_name, asks_name, t_name):
        """
            bids_name, asks_name, t_name : Different jsons have different name for bids and asks and timestamps
            t__name as if time.time()
        """
        self.update_books_helper(total_books[bids_name], 'bids')
        self.update_books_helper(total_books[asks_name], 'asks')
        self.B['timestamp'] = total_books[t_name]
        self.B['current_price'] = (max([float(x[0]) for x in total_books[bids_name]]) + min([float(x[0]) for x in total_books[asks_name]])) / 2

    def update_books_helper(self, books, side):
        """
          side: bids, asks
        """
        # Omit books above 5% from the current price
        for book in books:
            p = float(book[0])
            cp = float(self.B['current_price'])
            if percentage_difference(p, cp) > self.price_level_ceiling:
                continue
            if book[1] == "0" or book[1] == 0:
                del self.B[side][book[0]]
            else:
                self.B[side][book[0]] = book[1]

    def dfs_input_books(self):
        """
            Inputs bids and asks into dfs
        """
        if float(self.previous_second) > float(self.current_second):
            self.dfs_books.replace(0, method='ffill', inplace=True)
            self.dfs_books.replace(0, method='bfill', inplace=True)
            self.dfs_books.fillna(0, inplace=True)
            self.snapshot = self.dfs_books.copy()
            self.dfs_books[self.dfs_books.columns] = 0
        self.previous_second = self.current_second


        # Raw data
        current_second = int(self.B['timestamp'] % 60) 
        self.current_second = current_second 
        current_price = (np.max([float(x) for x in self.B['bids'].keys()]) + np.min([float(x) for x in self.B['asks'].keys()])) / 2
        raw_books_quatities = np.array([float(x) for x in self.B['bids'].values()] + [float(x) for x in self.B['asks'].values()])
        raw_books_levels = np.array([float(x) for x in self.B['bids'].keys()] + [float(x) for x in self.B['asks'].keys()])
        # 
        self.dfs_books.loc[current_second, 'price'] = current_price
        # New books levels
        start = np.floor(np.min(raw_books_levels) / self.level_range ) * self.level_range 
        end = np.ceil(np.max(raw_books_levels) / self.level_range ) * self.level_range 
        books_levels = np.arange(start, end+1, self.level_range)
        # Are there new levels currently not in dataframe?
        new_levels = np.setdiff1d(books_levels, self.level_ranges)
        # Indices and grouped values
        grouped_values = np.bincount(np.digitize(raw_books_levels, bins=books_levels), weights=raw_books_quatities)
        if new_levels.size == 0:
            self.dfs_books.loc[current_second, books_levels] = grouped_values
        else:
            # Create newcolumns pandas dataframe
            new_columns_data = pd.DataFrame({new_level: [float(0)] * len(self.dfs_books) for new_level in new_levels})
            self.dfs_books = pd.concat([self.dfs_books, new_columns_data], axis=1)
            # Input new values
            self.dfs_books.loc[current_second, books_levels] = grouped_values
            empty_levels = np.array(self.dfs_books.columns[(self.dfs_books.eq(0) | self.dfs_books.isna()).all()])
            self.dfs_books.drop(columns=empty_levels, inplace=True)
            nc = np.array(self.dfs_books.columns)
            self.level_ranges = nc[nc != 'price']


In [58]:
# Colab

initial_books = json.load(open('/content/SatoshiVault/code/data_binance_books_Trades/books.json', 'r'))
trades = json.load(open('/content/SatoshiVault/code/data_binance_books_Trades/trades.json', 'r'))
books = json.load(open('/content/SatoshiVault/code/data_binance_books_Trades/bupdates.json', 'r'))
btc_price = (float(initial_books['bids'][0][0]) + float(initial_books['asks'][0][0])) / 2

# # Else
# initial_books = json.load(open('data_binance_books_Trades/books.json', 'r'))
# trades = json.load(open('data_binance_books_Trades/trades.json', 'r'))
# books = json.load(open('data_binance_books_Trades/bupdates.json', 'r'))
# btc_price = price


a  = sProcessBooks('binance', 'btc_usdt', btc_price, 20)

start =time.time()
a.update_books(initial_books, 'bids', 'asks', 'timestamp')
a.dfs_input_books()

for e in books:
    a.update_books(e, 'b', 'a', 'timestamp')
    a.dfs_input_books()


print(f"elapsed_time for {len(books)+1} iterations: ", time.time() - start)

KeyError: 12

In [54]:
class sProcessTrades():
    """
        Important notes: 
            Keep current price and current timestamp consistent among all of the sProcessors
            If the book is above price_level_ceiling from the current price, it will be deleted for computational efficiency.
            It would be wise to assume that over 60 secods, very wide books are unimportant 
        
        Descrtiption: 
            Processes second streams of limit orders and market orders
            level_range : price range of buckets to aggregate books
            price_level_ceiling : % ceiling of price levels to ommit, default 5%
    """

    def __init__(self, exchange, symbol, start_price, level_range, price_level_ceiling=5):
        # Identification
        self.exchange = exchange
        self.symbol = symbol
        # levels
        self.level_range = level_range
        self.price_level_ceiling = price_level_ceiling
        self.level_ranges = np.array([])
        # Raw data processors
        self.price = start_price
        self.dfs_trades = pd.DataFrame(index=list(range(0, 60, 1)) , columns=np.array(['price']))
        self.snapshot  = None
        self.previous_second = 0
        self.current_second = 1

    def dfs_input_trades(self, current_price, trade, t_name, p_name, q_name):
        """ 
            Note: For consistency use a price from a single instrument, rather than separate. This is indeed a good approximation
                  As well, keep the same timestamps
                  Only a single trade, not a list of trades

            t_name: timestamp name in the dictionary
            p_name: price name in the dictionary
            q_name: quantity name in the dictionary

            Inputs price, volume(amount) into dfs_trades frame
        """
        current_second = int(trade[t_name] % 60)  
        self.current_second = current_second 
        current_price = float(trade[p_name])  # current_price
        amount = float(trade[q_name])
        self.dfs_trades.loc[current_second, 'price'] = current_price
        level = np.floor_divide(current_price, self.level_range) * self.level_range
        if (level in self.level_ranges) == False:
            new_column = pd.DataFrame({level: [float(0)] * len(self.dfs_trades)})
            self.dfs_trades = pd.concat([self.dfs_trades, new_column], axis=1)
            self.dfs_trades.loc[current_second, level] += amount
            empty_levels = np.array(self.dfs_trades.columns[(self.dfs_trades.eq(0) | self.dfs_trades.isna()).all()])
            self.dfs_trades.drop(columns=empty_levels, inplace=True)
            nc = np.array(self.dfs_trades.columns)
            self.level_ranges = nc[nc != 'price']
        else:
            self.dfs_trades.loc[current_second, level] += amount
        if float(self.previous_second) > float(current_second):
            self.snapshot = self.dfs_trades.copy()
            self.snapshot.fillna(0, inplace = True)
            self.dfs_trades[self.dfs_trades.columns] = 0
        self.previous_second = self.current_second



In [61]:
trades = json.load(open('/content/SatoshiVault/code/data_binance_books_Trades/trades.json', 'r'))

a  = sProcessTrades('binance', 'btc_usdt', btc_price, 20)

start = time.time()

for e in trades:
    a.dfs_input_trades(btc_price, e, 'timestamp', 'p', 'q')

print("ela[sed] : ", time.time() - start)

In [62]:
class sProcessOI():
    """
        Important notes: 
            Keep current price and current timestamp consistent among all of the sProcessors
            If the book is above price_level_ceiling from the current price, it will be deleted for computational efficiency.
            It would be wise to assume that over 60 secods, very wide books are unimportant 
        
        Descrtiption: 
            Processes second streams of limit orders and market orders
            level_range : price range of buckets to aggregate books
            price_level_ceiling : % ceiling of price levels to ommit, default 5%
    """

    def __init__(self, exchange, symbol, start_price, level_range, price_level_ceiling=5):
        # Identification
        self.exchange = exchange
        self.symbol = symbol
        # levels
        self.level_range = level_range
        self.price_level_ceiling = price_level_ceiling
        self.level_ranges = np.array([])
        # Raw data processors
        self.price = start_price
        self.dfs_OI = pd.DataFrame(index=list(range(0, 60, 1)) , columns=np.array(['price']))
        self.snapshot  = None
        self.previous_second = 0
        self.current_second = 1

    def dfs_input_OI(self, current_price, trade, t_name, p_name, oi_name):
        """ 
            Note: For consistency use a price from a single instrument, rather than separate. This is indeed a good approximation
                  As well, keep the same timestamps
                  Only a single trade, not a list of trades

            t_name: timestamp name in the dictionary
            p_name: price name in the dictionary
            q_name: quantity name in the dictionary

            Inputs price, volume(amount) into dfs_trades frame
        """
        current_second = int(trade[t_name] % 60)  
        self.current_second = current_second 
        current_price = float(trade[p_name])  # current_price
        amount = float(trade[oi_name])
        self.dfs_OI.loc[current_second, 'price'] = current_price
        level = np.floor_divide(current_price, self.level_range) * self.level_range
        if (level in self.level_ranges) == False:
            new_column = pd.DataFrame({level: [float(0)] * len(self.dfs_OI)})
            self.dfs_OI = pd.concat([self.dfs_OI, new_column], axis=1)
            self.dfs_OI.loc[current_second, level] = amount
            empty_levels = np.array(self.dfs_OI.columns[(self.dfs_OI.eq(0) | self.dfs_OI.isna()).all()])
            self.dfs_OI.drop(columns=empty_levels, inplace=True)
            nc = np.array(self.dfs_OI.columns)
            self.level_ranges = nc[nc != 'price']
        else:
            self.dfs_OI.loc[current_second, level] = amount
        if float(self.previous_second) > float(self.current_second):
            self.snapshot = self.dfs_OI.copy()
            self.snapshot.fillna(0, inplace = True)
            self.dfs_OI[self.dfs_OI.columns] = 0
        self.previous_second = current_second 



In [46]:
trades = json.load(open('/content/SatoshiVault/code/data_binance_books_Trades/trades.json', 'r'))

a  = sProcessOI('binance', 'btc_usdt', btc_price, 20)

start = time.time()

for e in trades:
    a.dfs_input_OI(btc_price, e, 'timestamp', 'p', 'q')

print("ela[sed] : ", time.time() - start)


NameError: name 'sProcessTrades' is not defined

In [None]:
class sProcessLiquidations():
    """
        Important notes: 
            Keep current price and current timestamp consistent among all of the sProcessors
            If the book is above price_level_ceiling from the current price, it will be deleted for computational efficiency.
            It would be wise to assume that over 60 secods, very wide books are unimportant 
        
        Descrtiption: 
            Processes second streams of limit orders and market orders
            level_range : price range of buckets to aggregate books
            price_level_ceiling : % ceiling of price levels to ommit, default 5%
    """

    def __init__(self, exchange, symbol, start_price, level_range, price_level_ceiling=5):
        # Identification
        self.exchange = exchange
        self.symbol = symbol
        # levels
        self.level_range = level_range
        self.price_level_ceiling = price_level_ceiling
        self.level_ranges = np.array([])
        # Raw data processors
        self.price = start_price
        self.dfs_Liquidations = pd.DataFrame(index=list(range(0, 60, 1)) , columns=np.array(['price']))
        self.snapshot  = None
        self.previous_second = 0
        self.current_second = 1

    def dfs_input_Liquidations(self, current_price, trade, t_name, p_name, l_name):
        """ 
            Note: For consistency use a price from a single instrument, rather than separate. This is indeed a good approximation
                  As well, keep the same timestamps
                  Only a single trade, not a list of trades

            t_name: timestamp name in the dictionary
            p_name: price name in the dictionary
            q_name: quantity name in the dictionary

            Inputs price, volume(amount) into dfs_trades frame
        """
        current_second = int(trade[t_name] % 60)  
        self.current_second = current_second 
        current_price = float(trade[p_name])  # current_price
        amount = float(trade[l_name])
        self.dfs_Liquidations.loc[current_second, 'price'] = current_price
        level = np.floor_divide(current_price, self.level_range) * self.level_range
        if (level in self.level_ranges) == False:
            new_column = pd.DataFrame({level: [float(0)] * len(self.dfs_Liquidations)})
            self.dfs_Liquidations = pd.concat([self.dfs_Liquidations, new_column], axis=1)
            self.dfs_Liquidations.loc[current_second, level] = amount
            empty_levels = np.array(self.dfs_Liquidations.columns[(self.dfs_Liquidations.eq(0) | self.dfs_Liquidations.isna()).all()])
            self.dfs_Liquidations.drop(columns=empty_levels, inplace=True)
            nc = np.array(self.dfs_Liquidations.columns)
            self.level_ranges = nc[nc != 'price']
        else:
            self.dfs_Liquidations.loc[current_second, level] = amount
        if float(self.previous_second) > float(self.current_second):
            self.snapshot = self.dfs_Liquidations.copy()
            self.snapshot.fillna(0, inplace = True)
            self.dfs_Liquidations[self.dfs_Liquidations.columns] = 0
        self.previous_second = current_second 

In [None]:
trades = json.load(open('/content/SatoshiVault/code/data_binance_books_Trades/trades.json', 'r'))

a  = sProcessLiquidations('binance', 'btc_usdt', btc_price, 20)

start = time.time()

for e in trades:
    a.dfs_input_Liquidations(btc_price, e, 'timestamp', 'p', 'q')

print("ela[sed] : ", time.time() - start)

In [None]:
def get_exp_day(date):                                  
    today_day = datetime.now().timetuple().tm_yday
    today_year = datetime.now().year
    f = datetime.strptime(date, "%d%b%y")
    expiration_date = f.timetuple().tm_yday
    expiration_year = f.year
    if today_year == expiration_year:
        r = expiration_date - today_day
    if today_year == expiration_year + 1:
        r = 365 + expiration_date - today_day
    return float(r)

def get_by_expiration(interval, days_to_expire, strikes, OI, center_price):
    # Grouping OI by Strike
    mask = (days_to_expire == interval)
    s = strikes[mask]
    oi = OI[mask]
    unique_strikes = np.unique(s)
    grouped_sum = np.zeros_like(unique_strikes)
    for i, unique_strike in enumerate(unique_strikes):
        indices = np.where(s == unique_strike)
        grouped_sum[i] = np.sum(oi[indices])
    # Grouping by percentage from current price
    #ranges = [(center_price - center_price * percent / 100, center_price + center_price * percent / 100) for percent in percentage_ranges]
    return grouped_sum, unique_strikes

def aggregate_OIs(days: list, days_to_expire, strikes, open_interest, current_price):
    unique_expirations = np.unique(days_to_expire)
    OI = np.zeros_like(unique_expirations)
    for day in days:
        OI += get_by_expiration(day, days_to_expire, strikes, open_interest, current_price)

In [None]:
df_exp_0_1 = pd.DataFrame()  # day 0, 1
df_exp_1_7 = pd.DataFrame()  # 2-7
df_exp_7_35 = pd.DataFrame()  # 8-35
df_exp_35_ = pd.DataFrame()  # 35 +

options_data_raw= json.load(open("/content/SatoshiVault/code/data/option_snap.json"))
options_data = {
    
    "strikes" : np.array([float(x['instrument_name'].split('-')[-2]) for x in options_data_raw['result']]),
    "countdown" : np.array([get_exp_day(x['instrument_name'].split('-')[1]) for x in options_data_raw['result']]),
    "oi" : np.array([float(x['open_interest']) for x in options_data_raw['result']]),
}
unique_expirations = np.unique(options_data['countdown'])
index_price = float(options_data_raw['result'][0]['underlying_price'])

raw_pd = pd.DataFrame(options_data)
raw_pd = raw_pd.groupby(['countdown', 'strikes']).sum()



In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

OIs, unique_strikes = get_by_expiration(7, days_to_expire, strikes, open_interest, current_price)



plt.figure(figsize=(12, 8))
plt.plot(unique_strikes, OIs)
plt.axvline(x=current_price, color='red', linestyle='--', linewidth=2)
plt.xticks(rotation=90, ha='right') 
plt.xlabel('Strikes')
plt.ylabel('Open Interest')
plt.title('Open interest for every strike with current price')
plt.legend()
plt.show()


In [None]:
def percentage_difference(center, value):
    if center == 0 and value > center:
        return float(100)
    if value == 0 and value < center:
        return float(9999999999)
    else:
        diff = value - center
        average = (center + value) / 2
        percentage_diff = (diff / average) * 100
        return percentage_diff

ranges = np.array([1, 3, 5, 10, 25])
d = np.array([percentage_difference(current_price, strike) for strike in unique_strikes])

# Regular
* Price
* Price variance
* Technical indicators (Choose those that make more sense and have less data)
* Funding Rate
* Longs Short Rations, total
* Total OI
* Total Put/call
* Volume
* .....

# Do heatmaps of:

## Spot
 * Books
 * Trades
 * Canceled Books
 * Reinforced books

## Perp/Futures
 * Books  --- minute, hours
 * Trades --- Minutes hours
 * Canceled Books 
 * Reinforced books
 * Open Interest
 * Funding Rate
 * Top Traders Accounts  -- hourly (Binance)
 * Top Traders Positions -- hourly (Binance)
 * Global Account Ratio --- hourly (Binance, Bybit, okx)

## Options
* Call OI by Strike ---- to Price
* Put Open Interest by strike --- To price
* And those generated from them

# 1 min Features

# 1 h features



# BOOKS AND VOLUME MUST BE AGGREGATED


In [17]:
class secondProcessor:
    """
        insType : spot, perpetual, options
            Spot - processes books, trades, canceled books, and reinforced books
            Perpetual - processes books, trades, canceled books, and reinforced books, OI, liquidations and other Statistical data
            Option - processes only OI by strike, volume, liquidations
        start_price : the last price of the ticker . If you want to aggragate books from different exchanges, you should keep the current price consistent amount all of them
        bucket_range : price range of buckets to aggregate books
        n_buckets : number of buckets to create into single direction
        price_level_ceiling : % ceiling of price levels to ommit 5%

        BIG NOTE:  If the book is above 5% from the current price, it will be deleted for computational efficiency. 
                   It would be wise to assume the spoof 
    """

    def __init__(self, exchange, insType, symbol, start_price, level_range, price_level_ceiling=5):
        # Identification
        self.exchange = exchange
        self.insType = insType
        self.symbol = symbol
        self.level_range = level_range
        self.price_level_ceiling = price_level_ceiling
        self.level_ranges = get_level_ranges(start_price, level_range, price_level_ceiling)
        self.B = {"timestamp": 1, "current_price": float(start_price), "bids" : {}, "asks" : {}}
        self.current_price = start_price
        self.current_second = int(time.time() % 60)  
        # Bucket ranges for dataframes
        self.current_buckets_dfs = get_bucket_list(start_price, level_range, level_range)
        # self.current_buckets_dfm = get_bucket_list(start_price, bucket_range, n_buckets)
        if insType == 'spot':
            # Raw data frames 1 sec
            self.dfs_books = create_data_frame('sec', self.level_ranges)
            self.dfs_trades = create_data_frame('sec', self.level_ranges)
            self.dfs_canceled_books = create_data_frame('sec', self.level_ranges)
            self.dfs_reinforced_books = create_data_frame('sec', self.level_ranges)
            # Raw data frames 1 sec
            # self.dfm_books = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_trades = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_canceled_books = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_reinforced_books = create_data_frame('min', self.current_buckets_dfm)
        if insType == 'perpetual':
            # Raw data frames 1 sec
            self.dfs_books = create_data_frame('sec', self.level_ranges)
            self.dfs_trades = create_data_frame('sec', self.level_ranges)
            self.dfs_canceled_books = create_data_frame('sec', self.level_ranges)
            self.dfs_reinforced_books = create_data_frame('sec', self.level_ranges)
            self.dfs_OI = create_data_frame('sec', self.level_ranges)
            # Raw data frames 1 sec
            # self.dfm_books = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_trades = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_canceled_books = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_reinforced_books = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_binance_GLSA = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_OI = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_okx_GLSA = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_bybit_GLSA = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_binance_TLSA = create_data_frame('min', self.current_buckets_dfm)
            # self.dfm_binance_TLSP = create_data_frame('min', self.current_buckets_dfm)
        if insType == 'options':
            # Add here second frame
            self.dfm_deribit_call_OI_strike = create_data_frame('min', self.level_ranges)
            self.dfm_deribit_Put_OI_strike = create_data_frame('min', self.level_ranges)
        # Features
        #self.features_min = create_data_frame('min', self.current_buckets_dfh)  # Redo, different columns
        #self.features_hour = create_data_frame('h', self.current_buckets_dfh)   # Redo, different columns other module
        # self.current_buckets_dfh = get_bucket_list(start_price, bucket_range, n_buckets)

    def update_current_price(self, price):
        self.current_price = price

    
    def update_books(self, total_books, bids_name, asks_name, t_name):
        """
            bids_name, asks_name, t_name : Different jsons have different name for bids and asks and timestamps
            t__name as if time.time()
        """
        self.update_books_helper(total_books[bids_name], 'bids')
        self.update_books_helper(total_books[asks_name], 'asks')
        self.B['timestamp'] = total_books[t_name]
        self.B['current_price'] = (max([float(x[0]) for x in total_books[bids_name]]) + min([float(x[0]) for x in total_books[asks_name]])) / 2

    def update_books_helper(self, books, side):
        """
          side: bids, asks
        """
        # Omit books above 5% from the current price
        for book in books:
            p = float(book[0])
            cp = float(self.B['current_price'])
            if percentage_difference(p, cp) > self.price_level_ceiling:
                continue
            if book[1] == "0" or book[1] == 0:
                del self.B[side][book[0]]
            else:
                self.B[side][book[0]] = book[1]

    def dfs_input_books(self):
        """
            Inputs bids and asks into dfs
        """
        # Raw data
        current_second = int(self.B['timestamp'] % 60)  
        print(current_second)
        current_price = (np.max([float(x) for x in self.B['bids'].keys()]) + np.min([float(x) for x in self.B['asks'].keys()])) / 2
        raw_books_quatities = np.array([float(x) for x in self.B['bids'].values()] + [float(x) for x in self.B['asks'].values()])
        raw_books_levels = np.array([float(x) for x in self.B['bids'].keys()] + [float(x) for x in self.B['asks'].keys()])
        # 
        self.dfs_books['price'] = current_price
        # New books levels
        start = np.floor(np.min(raw_books_levels) / self.level_range ) * self.level_range 
        end = np.ceil(np.max(raw_books_levels) / self.level_range ) * self.level_range 
        books_levels = np.arange(start, end+1, self.level_range)
        # Are there new levels currently not in dataframe?
        new_levels = np.setdiff1d(books_levels, self.level_ranges)
        # Indices and grouped values
        grouped_values = np.bincount(np.digitize(raw_books_levels, bins=books_levels), weights=raw_books_quatities)
        if new_levels.size == 0:
            self.dfs_books.loc[current_second, books_levels] = grouped_values
            self.dfs_books.fillna(method='ffill',  inplace=True)
        else:
            # Create newcolumns pandas dataframe
            self.dfs_books[new_levels] = float(0)
            self.dfs_trades[new_levels] = float(0)
            self.dfs_canceled_books[new_levels] = float(0)
            self.dfs_reinforced_books[new_levels] = float(0)
            # Input new values
            self.dfs_books.loc[current_second, books_levels] = grouped_values
            self.dfs_books.fillna(method='ffill',  inplace=True)
            # Remove columns if those are not within 5% range
            columns_to_drop = filter_ranges(self.level_ranges, current_price, self.price_level_ceiling)
            self.dfs_books.drop(columns=columns_to_drop, inplace=True)
            self.dfs_trades.drop(columns=columns_to_drop, inplace=True)
            self.dfs_canceled_books.drop(columns=columns_to_drop, inplace=True)
            self.dfs_reinforced_books.drop(columns=columns_to_drop, inplace=True)
            self.level_ranges = np.setdiff1d(np.concatenate((self.level_ranges, new_levels)), columns_to_drop)



    def dfs_input_trades(self, trade, t_name, p_name, q_name):                                            # Works fine  # FIX when inputing new data to dfm The last raw is being deleted
        """ 
            t_name: timestamp name in the dictionary
            p_name: price name in the dictionary
            q_name: quantity name in the dictionary

            Inputs price, volume(amount) and trades into dfs_trades frame
            Inputs price into dfs_books
        """
        current_second = int(trade[t_name]% 60)  
        current_price = float(trade[p_name])
        amount = float(trade['q'])
        level = find_level(self.level_ranges, current_price)
        # Not needed, just incase
        if level == None:
            new_levels = np.setdiff1d(level, self.level_ranges)
            # Create newcolumns pandas dataframe
            self.dfs_books[new_levels] = float(0)
            self.dfs_trades[new_levels] = float(0)
            self.dfs_canceled_books[new_levels] = float(0)
            self.dfs_reinforced_books[new_levels] = float(0)
            # Input new values
            self.dfs_trades.loc[current_second, level] += amount
            # The next levels should all be 0
            if current_second == 59:
                # Push to 1 min frame
                self.dfs_trades.loc[0:58] = float(0)
            if current_second == 0:
                self.dfs_trades.loc[59] = float(0)

        else:
            self.dfs_trades.loc[current_second, level] += amount
            if current_second == 59:
                self.dfs_trades.loc[0] = float(0)
            else:
                self.dfs_trades.loc[current_second] = float(0)





    # def dfs_input_canceled_reinforced(self):
    #     """
    #         Periodic task, must be done every minute
    #     """
    #     # Calculate canceled books
    #     current_second = get_current_time('s')
    #     previous_column = current_second -1
    #     if current_second == 0:
    #         previous_column = 59
    #     # Calculates canceled or reinforced books over certain timestamps
    #     D = (self.dfs_books.iloc[current_second, :] + self.dfs_trades.iloc[current_second, :]) - (self.dfs_books.iloc[previous_column, :] + self.dfs_trades.iloc[previous_column, :])
    #     self.dfs_canc_books.iloc[current_second, :] = D.clip(lower=0)
    #     self.dfs_rein_books.iloc[current_second, :] = D.clip(upper=0).abs()


    # def dfs_input_trades(self, trade):                                            # Works fine  # FIX when inputing new data to dfm The last raw is being deleted
    #     """ 
    #         Inputs price, volume(amount) and trades into dfs_trades frame
    #         Inputs price into dfs_books
    #     """
    #     current_second = get_current_time('s')
    #     current_price = float(trade['p'])
    #     print(current_price)
    #     amount = float(trade['q'])
    #     located_bucket = self.locate_bucket(current_price, self.current_buckets_dfs)

    #     self.dfs_books.loc[current_second, 'price'] = current_price
    #     self.dfs_trades.loc[current_second, 'price'] = current_price
    #     self.dfs_canceled_books.loc[current_second, 'price'] = current_price
    #     self.dfs_reinforced_books.loc[current_second, 'price'] = current_price

    #     # If the bucket exists
    #     if located_bucket != None:
    #         col_name = "_".join([str(located_bucket[0]), str(located_bucket[1])])
    #         if np.isnan(self.dfs_trades.at[current_second, col_name]) == True:
    #             self.dfs_trades.loc[current_second, col_name] =  amount
    #         else: 
    #             self.dfs_trades.loc[current_second, col_name] = self.dfs_trades.at[current_second, col_name] + amount
    #     # if the bucket doesnt exist
    #     if located_bucket == None:
    #       # Makes new buckets
    #       new_buckets = self.get_new_buckets(current_price, self.current_buckets_dfs)
    #       self.current_buckets_dfs = sorted(self.current_buckets_dfs + new_buckets)
    #       new_names = ["_".join([str(x[0]), str(x[1])]) for x in new_buckets]
    #       for name in new_names:
    #           self.dfs_trades[name] = np.nan
    #           self.dfs_books[name] = np.nan
    #           self.dfs_canc[name] = np.nan
    #           self.dfs_rein[name] = np.nan
    #       # Finaly input values
    #       l = self.locate_bucket(current_price, self.current_buckets_dfs)
    #       col_name = "_".join([str(l[0]), str(l[1])])
    #       if np.isnan(self.dfs_trades.at[current_second, col_name]) == True:
    #           self.dfs_trades.loc[current_second, col_name] =  amount
    #       else: 
    #           self.dfs_trades.loc[current_second, col_name] = self.dfs_trades.at[current_second, col_name] + amount

    #     # Make sure the next row is empy
    #     s = get_current_time('s')
    #     if s == 59:
    #       s = 0
    #     dfs_trades.iloc[s+1, :] = np.nan




    # def dfs_input_books(self, books : dict):
    #     """
    #         Updates dfs bids and asks.
    #         The first time you call it, it populates entire dfs wthich will later be replaced
    #         Note: For consistency, start streaming at the first second
    #     """
    #     current_second = int(books['timestamp'] % 60)  # current_second = get_current_secod() # For production
    #     next_secod = 0 if current_second == 59 else current_second + 1
    #     if self.dfs_input_books_call_caount == 0:
    #         self.dfs_input_books_helper(books['asks'], current_second)
    #         self.dfs_input_books_helper(books['bids'], current_second)
    #         self.dfs_books = self.dfs_books.applymap(lambda x: np.nan if x == 0 else x).ffill().bfill()
    #         self.dfs_input_books_call_caount += 1
    #     else:
    #         self.dfs_input_books_helper(books['asks'], current_second)
    #         self.dfs_input_books_helper(books['bids'], current_second)
    #     #self.dfs_books.iloc[next_secod, :] = 0

    # def dfs_input_books_helper(self, books : list, current_second):                         # Works Fine # Works fine  # FIX when inputing new data to dfm. The last raw delete, which is not suppoused to be
    #     """
    #         Iputs books in dfs dataframe
    #         Creates new_buckets if necessary
    #         Updates current buckets dfs
    #     """
    #     for book in books:
    #         col = float(book[0])
    #         amount = float(book[1])
    #         located_bucket = self.locate_bucket(col, self.current_buckets_dfs)
    #         # If the bucket exists
    #         if located_bucket != None:
    #             col_name = "_".join([str(located_bucket[0]), str(located_bucket[1])])
    #             self.dfs_books.loc[current_second, col_name] +=  amount    # This are buckets, this must no be replace, but, rathre , add
    #         # If the bucket doesn't exist
    #         if located_bucket == None:
    #             # Makes new buckets
    #             new_buckets = self.get_new_buckets(col, self.current_buckets_dfs)
    #             self.current_buckets_dfs = sorted(self.current_buckets_dfs + new_buckets)
    #             new_names = ["_".join([str(x[0]), str(x[1])]) for x in new_buckets]
    #             for name in new_names:
    #                 self.dfs_trades[name] = float(0)
    #                 self.dfs_books[name] = float(0)
    #                 self.dfs_canceled_books[name] = float(0)
    #                 self.dfs_reinforced_books[name] = float(0)
    #             # Finaly input values
    #             l = self.locate_bucket(col, self.current_buckets_dfs)
    #             col_name = "_".join([str(l[0]), str(l[1])])
    #             self.dfs_books.loc[current_second, col_name] +=  amount


    # def locate_bucket(self, value: float, ranges : list):               # Works well
    #     for index, (start, end) in enumerate(ranges):
    #         if start <= value < end:
    #             return ranges[index]
    #     return None
      
            
    # def get_new_buckets(self, value : float, current_buckets : list):    # Works fine
    #     step = self.bucket_range
    #     mmax = max(current_buckets)[1]
    #     mmin = min(current_buckets)[0]
    #     l = []
    #     if value > mmax or value == mmax:
    #         rounded_value = int(math.ceil(value / step) * step)
    #         for v in range(mmax, rounded_value, step):
    #             l.append([v, v+step ])
    #         if rounded_value == value and value == mmax:
    #             l.append([int(value), int(value+step)])
    #     if value < mmin:
    #         rounded_value = max(0, int(value / step) * step)
    #         for v in range(rounded_value, mmin, step):
    #             l.append([v, v+step ])
    #     return l    

    # def dfs_remove_empty_columns(self):
    #     pass
        # # REDO
        # # If all of the certain buckets cointain the same NULLS - remove
        # # dfs
        # empty_columns = [[int(x.split('_')[0]), int(x.split('_')[1])]for x in self.dfs_books.columns[self.dfs_books.isnull().all()].tolist()]
        # self.current_buckets_dfs = [x for x in self.current_buckets_dfs if x not in empty_columns]
        # self.dfs_books.dropna(axis=1, how='all')
        # self.dfs_canc_books(axis=1, how='all')



In [6]:
initial_books = json.load(open('data_binance_books_Trades/books.json', 'r'))
trades = json.load(open('data_binance_books_Trades/trades.json', 'r'))
books = json.load(open('data_binance_books_Trades/bupdates.json', 'r'))




engine = secondProcessor('binance', 'perpetual', 'btc_usdt', 43000, 10, 1)


start_time = time.time()

engine.dfs_input_books(initial_books)

print("Elapsed_time", time.time() - start_time)

Elapsed_time 0.5400278568267822


In [11]:
engine.dfs_books.replace(0, np.nan).bfill(limit=60)


Unnamed: 0,price,volume,42990_43000,43000_43010,42620_42630,42630_42640,42640_42650,42650_42660,42660_42670,42670_42680,...,42240_42250,42230_42240,42220_42230,42210_42220,42200_42210,42190_42200,42180_42190,42170_42180,42160_42170,42150_42160
0,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
1,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
2,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
3,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
4,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
5,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
6,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
7,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
8,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105
9,,,0.25955,0.00015,0.00021,0.00025,0.00074,0.34994,0.00025,0.00013,...,0.05056,0.04535,0.61559,0.09615,23.43409,0.10735,5.58725,0.03428,0.12386,0.00105


In [34]:
int(initial_books['timestamp'] % 60)

12

In [70]:
float('0.33076')

0.33076