In [1]:
from websocket import create_connection
import threading
import json
import pandas as pd
import numpy as np
import websocket
import datetime
from functools import partial
import timeit
import requests
import time
import mysql.connector
from mysql.connector import Error
from sqlalchemy import create_engine
pd.set_option('display.float_format', lambda x: '%.5f' % x)

In [41]:
class BTC_Data(): 
    def __init__(self, has_data:bool, print_on_init: bool):
        self.has_data = has_data
        self.engine   = None
        self.df_1min  = None
        self.df_5min  = None
        self.df_30min = None
        self.df_1hr   = None
        self.df_4hr   = None
        self.df_12hr  = None
        self.df_24hr  = None
        self.data_que =[]
        self.buildEngine()
        self.readData()
        self.get_data_since_last_run(print_on_init,'self')
        
        
    def buildEngine(self):
        env_vars = {} 
        with open('src/env.txt') as f:
            for line in f:
                if line.startswith('#') or not line.strip():
                    continue
                key, value = line.strip().split('=', 1)
                env_vars[key]= value
        
        self.engine = create_engine("mysql+pymysql://{user}:{pw}@{host}/{db}"
                       .format(user=env_vars['USER'],
                               pw=env_vars['PASSWORD'],
                               host=env_vars['HOST'],
                               db=env_vars['DB']))
        print('SQL Engine Built...')
        
    def readData(self):
        if (self.has_data): 
            self.df_1min  = pd.read_csv('BTC_Ticker_Data_1_Min.csv') 
            print('1 Min Pulled in...')
            self.df_5min  = pd.read_csv('BTC_Ticker_Data_5_Min.csv') 
            print('5 Min Pulled in...')
            self.df_30min = pd.read_csv('BTC_Ticker_Data_30_Min.csv') 
            print('30 Min Pulled in...')
            self.df_1hr   = pd.read_csv('BTC_Ticker_Data_1_Hour.csv') 
            print('1 hour Pulled in...')
            self.df_4hr   = pd.read_csv('BTC_Ticker_Data_4_Hour.csv') 
            print('4 hour Pulled in...')
            self.df_12hr  = pd.read_csv('BTC_Ticker_Data_12_Hour.csv') 
            print('12 hour Pulled in...')
            self.df_24hr  = pd.read_csv('BTC_Ticker_Data_24_Hour.csv')
            print('24 hour Pulled in...')
            
        else:
            # Read the data from sql and send to CSV 
            self.df_1min = pd.read_sql_table('BTC_Ticker_Data', con=self.engine)
            print('1 Min Pulled in...')
            self.df_5min = pd.read_sql_table('BTC_Ticker_Data_5_Min', con=self.engine)
            print('5 Min Pulled in...')
            self.df_30min = pd.read_sql_table('BTC_Ticker_Data_30_Min', con=self.engine)
            print('30 Min Pulled in...')
            self.df_1hr = pd.read_sql_table('BTC_Ticker_Data_1_Hour', con=self.engine)
            print('1 hour Pulled in...')
            self.df_4hr = pd.read_sql_table('BTC_Ticker_Data_4_Hour', con=self.engine)
            print('4 hour Pulled in...')
            self.df_12hr = pd.read_sql_table('BTC_Ticker_Data_12_Hour', con=self.engine)
            print('12 hour Pulled in...')
            self.df_24hr = pd.read_sql_table('BTC_Ticker_Data_24_Hour', con=self.engine)
            print('24 hour Pulled in...')
            ####################
            # create a loop to deal with extra columns for now this works fine Likely will delete this
            self.df_1min  = df_1min.iloc[: , 2:]
            self.df_5min  = df_5min.iloc[: , 2:]
            self.df_30min = df_30min.iloc[: , 2:]
            self.df_1hr   = df_1hr.iloc[: , 2:]
            self.df_4hr   = df_4hr.iloc[: , 2:]
            self.df_12hr  = df_12hr.iloc[: , 2:]
            self.df_24hr  = df_24hr.iloc[: , 2:]
            ####################
            self.print_all_to_CSVs()
            
    def print_all_to_CSVs(self): 
        self.df_1min.to_csv('BTC_Ticker_Data_1_Min.csv',index=False)
        print('1 Min printed to CSV...')
        self.df_5min.to_csv('BTC_Ticker_Data_5_Min.csv',index=False)
        print('5 Min printed to CSV...')
        self.df_30min.to_csv('BTC_Ticker_Data_30_Min.csv',index=False)
        print('30 Min printed to CSV...')
        self.df_1hr.to_csv('BTC_Ticker_Data_1_Hour.csv',index=False)
        print('1 hour printed to CSV...')
        self.df_4hr.to_csv('BTC_Ticker_Data_4_Hour.csv',index=False)
        print('4 hour printed to CSV...')
        self.df_12hr.to_csv('BTC_Ticker_Data_12_Hour.csv',index=False)
        print('12 hour printed to CSV...')
        self.df_24hr.to_csv('BTC_Ticker_Data_24_Hour.csv',index=False)
        print('24 hour printed to CSV...')

    def get_data_since_last_run(self, print_to_CSV: bool, called_by: str ): 
        if (called_by == 'web'): ###############################
            print('checkpoint 1')
        start_time = (self.df_1min.iloc[-1]['Unix Timestamp']+60) * 1000000000 # add 60 to start at first missing point
        end_time = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp() * 1000000000
        dfList = []

        while start_time < end_time :
            print(f"Retrieving data from {datetime.datetime.utcfromtimestamp(start_time // 1000000000)}...")

            query_string = 'https://api.kraken.com/0/public/Trades?pair=xbtusd&since='+ str(start_time)
            resp = requests.get(query_string).json()

            # Append new data to dataframe
            tempFrame = pd.DataFrame(resp['result']['XXBTZUSD'])
            dfList.append(tempFrame)

            # Determine last retrieved date
            if (start_time == int(resp['result']['last'])):
                break
            else:            
                start_time = int(resp['result']['last'])

            # Sleep to avoid being rate limited
            time.sleep(1.8)
        
        if (called_by == 'web'): ###############################
            print('checkpoint 2')  
        if (len(dfList)!=0):  
            # combine to Panadas DF 
            trade_data = pd.concat(dfList, ignore_index=True)
            if (called_by == 'web'): ###############################
                print('checkpoint 2.1')
            trade_data = trade_data.astype({0: float, 1: float, 2:float})

            if (called_by == 'web'): ###############################
                print('checkpoint 3')

            start_time = (self.df_1min.iloc[-1]['Unix Timestamp']+60)
            end_time   = int(end_time / 1000000000)
            end_time   = end_time-(end_time%60)
            print(end_time)
            # hereeee
            new_one_min_data =[]
            symbol_val   = 'BTCUSD'
            open_val, high_val, low_val, close_val, volume_val = 0,0,0,0,0
            while start_time < end_time:  
                relevant_rows = trade_data[
                            (trade_data[2] >= start_time) &
                            (trade_data[2] < (start_time + 60) )
                            ]
                unix_val     = start_time
                datetime_val = datetime.datetime.utcfromtimestamp(start_time)
                if len(relevant_rows) !=0: 
                    open_val     = relevant_rows.iloc[0][0]
                    close_val    = relevant_rows.iloc[-1][0]
                    high_val     = np.max(relevant_rows[0])
                    low_val      = np.min(relevant_rows[0])
                    volume_val   = round(relevant_rows[1].sum(), 8)   
                else :# If there are no trades in the given time frame
                    # TODO needs validation here in case its the first item 
                    open_val     = new_one_min_data[-1][6]
                    close_val    = new_one_min_data[-1][6]
                    high_val     = new_one_min_data[-1][6]
                    low_val      = new_one_min_data[-1][6]
                    volume_val   = 0 

                # Creates 1 min df 
                new_row = [unix_val, datetime_val, symbol_val, open_val, high_val, low_val, close_val, volume_val]
                new_one_min_data.append(new_row)
                # add 1 min to start time 
                start_time+=60

            cols = ['Unix Timestamp','Date','Symbol','Open','High','Low','Close','Volume']
            new_one_min_data_DF = pd.DataFrame(new_one_min_data, columns=cols)
            # add new Data to old data 
            new_df_1min = pd.concat([self.df_1min, new_one_min_data_DF], ignore_index=True)

            cols = ['Unix Timestamp','Date','Symbol','Open','High','Low','Close','Volume','Time Frame']
            # df_5min  #########################
            last_5min      = self.df_5min.loc[len(self.df_5min)-1,'Unix Timestamp']
            rows_to_add    = self.combine_time_frames(new_df_1min, last_5min, 5, "5 Min")
            rows_to_add_df = pd.DataFrame(rows_to_add,columns=cols)
            new_df_5min    = pd.concat([self.df_5min, rows_to_add_df], ignore_index=True)

            # df_30min #########################
            last_30min     = self.df_30min.loc[len(self.df_30min)-1,'Unix Timestamp']
            rows_to_add    = self.combine_time_frames(new_df_1min, last_30min, 30, "30 Min")
            rows_to_add_df = pd.DataFrame(rows_to_add,columns=cols)
            new_df_30min   = pd.concat([self.df_30min, rows_to_add_df], ignore_index=True)

            # df_1hr   #########################
            last_1hour     = self.df_1hr.loc[len(self.df_1hr)-1,'Unix Timestamp']
            rows_to_add    = self.combine_time_frames(new_df_1min, last_1hour, 60, "1 Hour")
            rows_to_add_df = pd.DataFrame(rows_to_add,columns=cols)
            new_df_1hr     = pd.concat([self.df_1hr, rows_to_add_df], ignore_index=True)

            # df_4hr   #########################
            last_4hour     = self.df_4hr.loc[len(self.df_4hr)-1,'Unix Timestamp']
            rows_to_add    = self.combine_time_frames(new_df_1min, last_4hour, 240, "4 Hour")
            rows_to_add_df = pd.DataFrame(rows_to_add,columns=cols)
            new_df_4hr     = pd.concat([self.df_4hr, rows_to_add_df], ignore_index=True)

            # df_12hr  #########################
            last_12hour    = self.df_12hr.loc[len(self.df_12hr)-1,'Unix Timestamp']
            rows_to_add    = self.combine_time_frames(new_df_1min, last_12hour, 720, "12 Hour")
            rows_to_add_df = pd.DataFrame(rows_to_add,columns=cols)
            new_df_12hr    = pd.concat([self.df_12hr, rows_to_add_df], ignore_index=True)

            # df_24hr  #########################
            last_24hour    = self.df_24hr.loc[len(self.df_24hr)-1,'Unix Timestamp']
            rows_to_add    = self.combine_time_frames(new_df_1min, last_24hour, 1440, "24 Hour")
            rows_to_add_df = pd.DataFrame(rows_to_add,columns=cols)
            new_df_24hr    = pd.concat([self.df_24hr, rows_to_add_df], ignore_index=True)


            self.df_1min  = self.apply_indicators_to_whole(new_df_1min)
            print('1 Min Updated')
            self.df_5min  = self.apply_indicators_to_whole(new_df_5min)
            print('5 Min Updated')
            self.df_30min = self.apply_indicators_to_whole(new_df_30min)
            print('30 Min Updated')
            self.df_1hr   = self.apply_indicators_to_whole(new_df_1hr)
            print('1 Hour Updated')
            self.df_4hr   = self.apply_indicators_to_whole(new_df_4hr)
            print('4 Hour Updated')
            self.df_12hr  = self.apply_indicators_to_whole(new_df_12hr)
            print('12 Hour Updated')
            self.df_24hr  = self.apply_indicators_to_whole(new_df_24hr)
            print('24 Hour Updated')
            if print_to_CSV: 
                self.print_all_to_CSVs()
        
    def combine_time_frames(self, data, first_date, increment, dfString):
        new_data =[]
        symbol_val   = 'BTCUSD'
        open_val, high_val, low_val, close_val, volume_val = 0,0,0,0,0
        start_time = first_date + 60
        while True: 
            relevant_rows = data[
                        (data['Unix Timestamp'] >= start_time) &
                        (data['Unix Timestamp'] < (start_time + (60*increment)) )
                        ]
            if len(relevant_rows)!=increment:
                break
            # loop break condition if relevant rows does not have the right amount of rows in it 

            unix_val     = relevant_rows.iloc[-1]['Unix Timestamp']
            datetime_val = relevant_rows.iloc[-1]['Date']
            open_val     = relevant_rows.iloc[0]['Open']
            high_val     = relevant_rows['High'].max()
            low_val      = relevant_rows['Low'].min()
            close_val    = relevant_rows.iloc[-1]['Close']
            volume_val   = relevant_rows['Volume'].sum()

            # Creates new row
            new_row = [unix_val, datetime_val, symbol_val, open_val, high_val, low_val, close_val, volume_val, dfString]
            new_data.append(new_row)

            # add 1 min * increment to start time 
            start_time+=(60 *increment)

        return new_data
        
    def apply_indicators_to_whole(self, df):
        # SMAs #####################
        df['MA_5']  = df['Close'].rolling(window=5).mean()
        df['MA_8']  = df['Close'].rolling(window=8).mean()
        df['MA_10'] = df['Close'].rolling(window=10).mean()
        df['MA_13'] = df['Close'].rolling(window=13).mean()
        df['MA_20'] = df['Close'].rolling(window=20).mean()
        df['MA_50'] = df['Close'].rolling(window=50).mean()
        # RSI #####################
        delta = df['Close'].diff()
        up = delta.clip(lower=0)
        down = -1*delta.clip(upper=0)
        ema_up = up.ewm(com=13, adjust=False).mean()
        ema_down = down.ewm(com=13, adjust=False).mean()
        rs = ema_up/ema_down
        df['RSI'] = 100 - (100/(1 + rs))
        # MacD #####################
        exp1 = df['Close'].ewm(span=12, adjust=False).mean()
        exp2 = df['Close'].ewm(span=26, adjust=False).mean()
        macd = exp1 - exp2
        df['MACD']= macd
        exp3 = macd.ewm(span=9, adjust=False).mean()
        df['M_Signal']= exp3
        return df
    
    def to_data_que(self, data_point: list):
        if len(self.data_que)==0:                      # Empty que just add the point 
            self.data_que.append(data_point)

        else: 
            if data_point[1] == self.data_que[-1][1]: # In the Same interval: add the point
                self.data_que.append(data_point)

            else: 
                # pop the last point                 
                # Add to the 1 min data frame 
                new_point = self.data_que.pop()
                self.to_one_min(new_point)
                # clear the que 
                # add the new point 
                self.data_que.clear()
                self.data_que.append(data_point)
                
    def to_one_min(self, data_point:list):
        formated_point = [
            int(float(data_point[1])),
            datetime.datetime.utcfromtimestamp(int(float(data_point[1]))),
            'BTCUSD',
            float(data_point[2]),
            float(data_point[3]),
            float(data_point[4]),
            float(data_point[5]),
            float(data_point[7])    
        ]
        print('new 1 min point = ', formated_point)
        if self.df_1min.iloc[-1]['Unix Timestamp'] == formated_point[0]:
            print('same')
            self.df_1min = self.df_1min.iloc[:-1 , :]
            #TODO fix the below 
        elif formated_point[0]-60 != self.df_1min.iloc[-1]['Unix Timestamp']:
            self.df_1min = self.df_1min.append([[
                                formated_point[0]-60,
                                datetime.datetime.utcfromtimestamp(formated_point[0]-60),
                                formated_point[2],
                                formated_point[3],
                                formated_point[4],
                                formated_point[5],
                                formated_point[6],
                                formated_point[7]
                                ]], ignore_index=True)
#         self.df_1min = temp
        
        self.df_1min = self.df_1min.append([formated_point], ignore_index=True)
#         temp2 = self.df_1min.append([formated_point])
        
#         self.df_1min = temp2
#         self.df_1min = self.apply_indicators_to_whole(self.df_1min)
        print('added new point')
#         print(self.df_1min.iloc[-1])
        
        

In [42]:
class KrakenWebSocketClient:
    """
    Kraken Client Interface

    """

    def __init__(self, data_class ):
        websocket.enableTrace(False)
        self.data_class=data_class
        ws = websocket.WebSocketApp("wss://ws.kraken.com/",
                                         on_message=self.on_message,
                                         on_error=self.on_error,
                                         on_close=self.on_close)
        self.ws = ws
        self.ws.on_open = self.on_open
        self.ws.run_forever()

    def on_message(self,ws, message):
        if "event" not in message: 
            formated_point = json.loads(message)
            self.data_class.to_data_que(formated_point[1])
            
        return message

    def on_error(self,ws, error):
        print(error)
        return error

    def on_close(self, a,b,c):
        self.ws.close()
        print("### closed ###")   
        # this is where the final data will be pushed to the dbs or something 

    def run(self, *args):
        global driver
        driver = True
        while driver:
            try:
                time.sleep(1)
#                 I think we can use the below part to stop the code 
#                 p = input()
#                 self.ws.send(p)
            except KeyboardInterrupt:
                driver = False
        time.sleep(1)
        self.ws.close()
        print("thread terminating...")

    def on_open(self,ws):
        print('Opened')
        self.data_class.get_data_since_last_run(False,'web')
        print(self.data_class.df_1min.iloc[-1])
        self.ws.send('{"event":"subscribe", "subscription":{"name":"ohlc"}, "pair":["XBT/USD"]}')
        t = threading.Thread(target=self.run)
        t.start()

In [43]:
HAS_DATA = True # Change this flag to flase to pull from sql
# need to Test the false

PRINT_ON_INIT= False
BTC_D=BTC_Data(HAS_DATA,PRINT_ON_INIT)
kraken_client = KrakenWebSocketClient(BTC_D)




SQL Engine Built...
1 Min Pulled in...
5 Min Pulled in...
30 Min Pulled in...
1 hour Pulled in...
4 hour Pulled in...
12 hour Pulled in...
24 hour Pulled in...
Retrieving data from 2021-12-11 19:36:00...
Retrieving data from 2021-12-11 20:33:34...
Retrieving data from 2021-12-11 21:56:26...
Retrieving data from 2021-12-11 23:07:44...
Retrieving data from 2021-12-11 23:41:35...
Retrieving data from 2021-12-12 00:21:13...
Retrieving data from 2021-12-12 00:49:26...
Retrieving data from 2021-12-12 01:23:32...
Retrieving data from 2021-12-12 02:08:44...
Retrieving data from 2021-12-12 02:43:03...
Retrieving data from 2021-12-12 04:16:47...
Retrieving data from 2021-12-12 05:53:31...
1639290540
1 Min Updated
5 Min Updated
30 Min Updated
1 Hour Updated
4 Hour Updated
12 Hour Updated
24 Hour Updated
Opened
checkpoint 1
Retrieving data from 2021-12-12 06:29:00...
Retrieving data from 2021-12-12 06:29:31...
checkpoint 2
checkpoint 2.1
checkpoint 3
1639290540
1 Min Updated
5 Min Updated
30 Min U

In [44]:
BTC_D.df_1min
# .iloc[3248954-100:3248954-50]

Unnamed: 0,0,1,2,3,4,5,6,7,Close,Date,...,MA_20,MA_5,MA_50,MA_8,M_Signal,Open,RSI,Symbol,Unix Timestamp,Volume
0,,NaT,,,,,,,243.95000,2015-10-09 00:00:00,...,,,,,0.00000,243.95000,,BTCUSD,1444348800,0.00000
1,,NaT,,,,,,,243.95000,2015-10-09 00:01:00,...,,,,,0.00000,243.95000,,BTCUSD,1444348860,0.00000
2,,NaT,,,,,,,243.95000,2015-10-09 00:02:00,...,,,,,0.00000,243.95000,,BTCUSD,1444348920,0.00000
3,,NaT,,,,,,,243.95000,2015-10-09 00:03:00,...,,,,,0.00000,243.95000,,BTCUSD,1444348980,0.00000
4,,NaT,,,,,,,243.95000,2015-10-09 00:04:00,...,,243.95000,,,0.00000,243.95000,,BTCUSD,1444349040,0.00000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3249026,,NaT,,,,,,,49071.20000,2021-12-12 06:26:00,...,49086.04000,49091.70000,49040.20000,49085.71250,18.14914,49071.20000,50.19412,BTCUSD,1639290360,0.00000
3249027,,NaT,,,,,,,49050.00000,2021-12-12 06:27:00,...,49085.18000,49080.32000,49040.88600,49083.52500,16.45265,49071.70000,46.58646,BTCUSD,1639290420,4.41221
3249028,,NaT,,,,,,,49054.80000,2021-12-12 06:28:00,...,49085.90500,49065.72000,49041.65000,49082.08750,14.56254,49051.40000,47.50642,BTCUSD,1639290480,0.00790
3249029,1639290600.00000,2021-12-12 06:30:00,BTCUSD,49050.10000,49070.40000,49050.10000,49066.10000,0.07583,,,...,,,,,,,,,,


In [10]:
def main():
    HAS_DATA = True # Change this flag to flase to pull from sql
    BTC_D=BTC_Data(HAS_DATA)
    

In [11]:
main()

SQL Engine Built...
