------
# High-Frequency Data Maintainment
    - Fetch new HFT data occasionally
    - Include new currencies occasionally
    
---

---
#### General Notes on HFT Data Fetching:

    - Remember to create a new dir for dataset fetching period!
    - Live millisecond data will never be available again in a wished frequency
    - So remenber to fetch in advance!
    - All currencies are fetched in parallel via multiprocessing
    - Time of data is UTC (-1h from Europe)
    
#### Notes on Data
    - 5 most relevant bid-ask-prices and volumes are fetched with a response delay (21 base features + time index) 
    - The Time index is when a fetching request was issued and received with a delay
    - This is all data adviced inside HFT. 
    - External features will generally be useless inside HFT.
    - Feature engineering can extract a lot of information these base features
    - Binance will provide Close prices in only another request. Thus they are not fetched.
    
#### Notes on Request Delay

    - A fetch request is typically answered after ~250 ms and for ~900 on occasional checkups
    - The 'Time' index is the time when the data is received
    - The 'Response Delay' is the time after an issued request is fullfilled with data
        • Includes the time of sending a request to Binance (about 200 ms of 'Response Delay')
        • Includes the time of Binance responding with data (about  50 ms of 'Response Delay')
        • So the real time delay of data a model faces is ~50 ms and the fetching time offset is ~200 ms
        • There is no found way to capture these separately with Binance or ccxt

          
    - Coins are chosen for HFT if (MarketCap > 500M and 24hVolume > 50M) for https://www.binance.com/en/markets/overview
    - These are mostly the coins also viable for scalping (all coins from scalping are included at the time)
    - Check all Binance live orderbook data in: https://www.binance.com/en/markets/overview   
    
---

---
### Further Practical Notes on HFT Data Fetching:

    Delays:
    - Request delays (when sending + validating) range from 200-230 ms in Europe and 20-30 ms on Tokyo AWS servers.
    - Response delays (when Binance sends) only last less than 15 ms in Europe and less in Tokyo.
    - Longer response delays of 0.9 s in Europe and ?.? s in Tokyo can occur when Binance does an initial checkup of the IP
    
    Used Method:
    - The first version of fetching involved tacted HTTP requests via ccxt.binance
        + tacted fetches in 100 ms steps possible (19:10.100 --> 19:10.200 --> 19:10.300, ...) (from Tokyo)
        
        - random request delays on Binance's IP checkups (~900 ms) both on Europe and Tokyo
        - The request limit is a merely 20 requests per second
        - Thos makes fetching multiple currencies concurrently impossible
        - delays are long from europe (250 ms) and only good on AWS Tokyo
     
     - The now most recent version uses 'websocket' from Binance:
        + No problem with requesting limits
        + 100 ms tacted data minimally (lowest reponse freq from Binance) (will be enough)
        + only 1 ms response offset and rarely just 15 ms
        + No occasional IP checkups with 900 ms delays
        + less nan values due to delays, better data for RL and ML
        + simplicity
        
        - An incremental response delay (~15 ms) hinders the tacting in steps like (19:10.100 --> 19:10.200, ...)

        --> new version gets better data more easily
    
    Disk Space and RAM
    - Fetching locally from a Jupyter notebook forces one to keep the local machine running
    - There will be 864.000 data points (~190 MB) per day for each currency on 100 ms frequencies
    - One day of 100 ms data will eat up 300 MB of RAM for the program with 20 base features
    
        • Total disk space = 50 pairs * 30 d * 190 MB 
                           = ~ 285 GB per month for 50 currencies
                           = ~   6 GB per month for one currency
                           • Storing 250 GB on AWS Glacier: 1 € per month
                           
        • Total RAM for fetching ~ less than 2 GB
    
    Goal of Data
    - It is adviced to fetch an extensive train dataset in 100 ms freq
        • It will be viable for upsampling in 250 and 500 ms 
        • A lot of data will be useful for a robust RL pre-training 
          which includes different starting and ending times of episodes!
        • Multiple extensive train datasets with diverse time series
          characteristics will improve a RL model significantly
    - Moreso, it is adviced to keep fetching over multiple months without breaks
        • It ensures always the newest data
        • It wont miss out on any interesting events
        • Flexibly having data for customers or experiments will prove very handy
    - It is also adviced to fetch datasets for every currency synchronically to other datasets
      for comparibility
        
        
---------------------

### Imports

In [1]:
import pandas as pd
import numpy as np
import os
from urllib.error import  HTTPError
import time
import datetime
from datetime import timedelta
import warnings
import ccxt
from ccxt.base.errors import RateLimitExceeded, BadSymbol
import tqdm
from multiprocessing.pool import ThreadPool
import threading
import json
import websocket

#warnings.filterwarnings('ignore')

### Datastream Class

In [8]:
#https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md
class HFT_Datastream:
    """ Fetches one pair in a regular 100 ms frequency
    
        One fetcher for one pair is mandatory, as responsemessages wont be distinguishable.
        
        -
    
    """
    """ A class that fetches the 5 most recent orderbook entries a single Binance 
        pair in tacted 100 millisecond intervals. It uses an independent thread for that.
        
        
        Using websockets from binance, these request delays only occur for sending requests
        It provides a total of 21 base features including the top 5 bid prices, bid volumes, 
        ask prices, ask volumes and 1 request delay
    
        Functionality:
            - These mentioned above
            - Keeps exavtly one np.array() as the most recent fetch that can get accessed by a recorder class
            - Stores info if a new fetch is available after the last fetch via 'fetcher.fetch_is_updated'
                - The fetcher may skips entire fetch intervals while keeping 'fetcher.fetch_is_updated' == 'False'
            - Will conveniently start at times of [12:15:01.000, 12:15:01.150, 12:15:01.300, ...]
    """
    
    
    def __init__(self, params):
        
        self.coin           = params['coin'          ] # 'BTC'
        self.stable_coin    = params['stable_coin'   ] # 'USDT'
        self.stream_freq    = params['stream_freq'   ] # in ms; multiple of 100 ms
        self.notified_insts = params['notified_insts'] # list of python objects
        self.num_of_entries = params['num_of_entries'] # only 5, 10 and 20 possible
        
        assert (self.stream_freq == 100)
        socket='wss://stream.binance.com:9443/ws'
        self.ws = websocket.WebSocketApp(socket, on_open    = self._start_subscription, 
                                                 on_message = self._on_message)
        
        self.message_num       = 1
        self.thread_is_running = False
        self.fetch_is_updated  = {str(inst) : False for inst in self.notified_insts}
        self.last_live_fetch   = None
        self.last_receive_time = None
        self.thread            = threading.Thread(target = self.ws.run_forever, args = ())   
        self.ticker            = self.coin + self.stable_coin
        
    def __del__(self):
        """ deleting HFT_Datastream properly.
            
            'del recorder' wont work. 
            Jupyter does call __del__() on overwriting class variables.
        """
        self.thread_is_running = True 
        self.stop_datastream()

    def start_datastream(self):
        assert self.thread_is_running == False
        self.thread_is_running = True
        self.thread.start()
            # self.thread.start() calls self.ws.run_forever()
            # self.ws.run_forever() calls _start_subscription()
            # and then, _on_message() will be invoked for every subscription message()

    def stop_datastream(self):
        assert self.thread_is_running == True
        self.ws.close()
        self.thread.join()
        
    def get_fetch_status(self, inst):
        return self.fetch_is_updated[str(inst)]
        
    def get_fetch(self, inst): # inst is the object that accesses the data with get_data
        self.fetch_is_updated[str(inst)] = False
        return self.last_live_fetch

    def _start_subscription(self, ws):
        for inst in self.notified_insts: 
            inst.received_data = False
        sub_message = {
            "method": "SUBSCRIBE",
            "params": # like ['btcusdt@depth5@100ms', 'adausdt@depth5@100ms', ...]
                [f"{self.ticker.lower()}@depth{self.num_of_entries}@{self.stream_freq}ms"],
             "id": 1
             }
        
        # UTC time; start at the next full second
        planned_req_time = pd.to_datetime(np.ceil(time.time())*10**9) 
        
        # Wait until the next timestep start
        first_waiting_time = planned_req_time.value/(10**9) - time.time()
        if first_waiting_time < 0:
                first_waiting_time = 0.0
        time.sleep(first_waiting_time)
        
        self.ws.send(json.dumps(sub_message))
        
        print(f"Opened websocket subscription for {self.tickers}") 

    def _on_message(self, ws, message): # started after subscription on every data message
        msg = json.loads(message)
        if 'lastUpdateId' in msg:  # filtering subsctription responses
            if (self.message_num*100 % self.stream_freq == 0): # fetching every 2nd data on self.stream_freq == 200ms
                self.message_num = 1
                response_time          = pd.to_datetime(time.time()*10**9)
                data_arr               = np.array([response_time]) 
                self.last_live_fetch   = np.hstack((data_arr, np.concatenate(
                                                           (msg['bids'][0:5], msg['asks'][0:5]), 
                                                            axis=None) ))
                self.fetch_is_updated  = {str(inst) : True for inst in self.notified_insts}

### Recorder Class

In [3]:
class HFT_Sample_Recorder():
    """ A class for storing and updating a list of fetches for other classes.
        It can operate multiple HFT_Sample_Fetcher instances for each currencies
    
        Functionality:
            - reads the data of HFT_Datastream instances and records their data
            - Poses as a flexible storage of all pairs for multiple classes
            - Data can be cleared or accessed by multiple other classes
            - It will provide new data only once it is available; the rarely skipped time steps will not get stored
            
        Note:
            - Change this class when using it for live fetching inside a RL environment
              --> reduces delay by 15 ms when there is no separate class that accesses a recorder
              --> a recorder can then be build on top of the rl live fetcher to avoid further delays
        """
    
    def __init__(self,
                 params):
        
        self.coins              = params['coins'         ] # like ['BTC', 'ETH', ...]
        self.stable_coin        = params['stable_coin'   ] # 'USTD'
        self.stream_freq        = params['stream_freq'   ] # like 0.25 (in s)   
        self.num_of_entries     = params['num_of_entries'] # 5, 10 or 20
        
        self.pair_list     = [c + self.stable_coin for c in self.coins]
        self.fetch_record  = {c + self.stable_coin : np.array([]) for c in self.coins}   
        self.stream_insts = {c + self.stable_coin :     HFT_Datastream({'coin'           : c,
                                                                        'stable_coin'    : self.stable_coin,
                                                                        'stream_freq'    : self.stream_freq,
                                                                        'num_of_entries' : self.num_of_entries,
                                                                        'notified_insts' : [self]})
                                 for c in self.coins}
    
        self.thread = threading.Thread(target = self.__recording_loop, args = ())
        self.thread_is_running = False

    def __del__(self):
        """ deleting HFT_Sample_Recorder properly.
            Clears the recorder and fetcher threads.
            
            'del recorder' wont work. 
            Jupyter does call __del__() on overwriting class variables.
        """
        self.thread_is_running = True 
        self.thread.join()
        stream_insts   = [self.stream_insts[key] for key in self.stream_insts]
        for stream_inst in stream_insts:
            stream_inst.__del__()
        
    def clear_data(self):
        self.fetch_record = {k : np.array([]) for k in self.fetch_record}
        
    def get_data(self):
        return self.fetch_record
    
    def get_currency_data(self, pair):
        return self.fetch_record[pair]
         
    def start_recording(self):
        """ Opens a new thread for recording with __recording_loop() """
        assert self.thread_is_running == False
        
        self.thread_is_running = True
        self.thread.start()
        
    def stop_recording(self):
        """ Stops a current thread and the fetching of data """
        assert self.thread_is_running == True
        self.thread_is_running = False # stops function __recording_loop; finishes the thread
        self.thread.join() 
        stream_insts   = [self.stream_insts[key] for key in self.stream_insts]
        for stream_inst in stream_insts:
            stream_inst.stop_datastream()

    def __start_synchronized_fetchers(self):
        """ Function worked by the thread of the class """  
        
        def _starting_func(stream_inst):
            stream_inst.start_datastream()
            
        stream_insts   = [self.stream_insts[key] for key in self.stream_insts]  
        pool = ThreadPool(processes=len(stream_insts))
        pool.map_async(_starting_func, stream_insts)    
        pool.close()
        pool.join()
        
    def __recording_loop(self): 
        self.__start_synchronized_fetchers()
        
        while self.thread_is_running:

            for pair in self.pair_list:
                pair_fetch_is_updated = self.stream_insts[pair].get_fetch_status(self)
                if pair_fetch_is_updated == True:
                    _fetch  = self.stream_insts[pair].get_fetch(self)
                    if len(self.fetch_record[pair]) != 0:
                        self.fetch_record[pair] = np.vstack([self.fetch_record[pair], _fetch])
                    else:                   
                        self.fetch_record[pair] = _fetch
            time.sleep(0.005) # NOTE: 15 ms of constant delay is caused by time.sleep(0.005) 
                              # local machine has minimum sleep time of 15 ms
                              # minimum sleep time can be 1 ms on other machines on AWS
            

        print(f'Thread finished for HFT_Sample_Recorder')

### Dataset Fetcher Class

In [4]:
class HFT_Dataset_Fetcher():
    """ A class for fetching multiple hft datasets using the class HFT_Sample_Recorder alongside multiprocessing. 
        
        Extra functionality:
            - Manages HFT_Sample_Recorder with abstraction
            --> Will not interfer the data fetching while performing saving 
            --> Makes routinely backup saves and will then clear HFT_Sample_Recorder to avoid OOM errors
            - HFT_Dataset_Fetcher fetches until manually stopped with inst.__del__()
            - These backup saves or dataset fragments are then combined with a Dataset_Extractor()
            - The backup saves are stored for later use and queried on demand with Dataset_Extractor()
            --> Store all numerous backup saves on Glacier (50*5=250 GB per month)
            --> Extract subsets to S3 Buckets (20*5*3d*150MB = ~100 GB on S3 bucket)   
        File structure: (all created if not present)
             hft data
                - raw fetched data
                    - since_2023-06-01 08:31
                        - ADA_USDT_100ms_(2023-06-01-08꞉31)_(2023-06-01-09꞉30)_(1)
                        - ADA_USDT_100ms_(2023-06-01-09꞉00)_(2023-06-01-09꞉30)_(2)
                        - ...
                    -  ...
                - extracted datasets
                    - from_(2023-06-01 12:00)_to_(2023-06-03 00:00)
                        - ADA_USDT_100ms_(2023-06-01 12:00)_(2023-06-03 00:00)
                        - BTC_USDT_100ms_(2023-06-01 12:00)_(2023-06-03 00:00)
                        - ...
                    - ...
                
        # '꞉' is unicode replacement (U+A789) of normal colon ':'
        # Try to always save as .pkl files for halving disk space and feasible reading/writing times!
        #   Use .csv saving for testing or demanded csv format only!
        #   All .csv can be extracted from .pkl files for special demands
                
    """
    
    def __init__(self,
                 hft_dataset_fetcher_params):
        
        self.file_extension     = hft_dataset_fetcher_params['file_extension'  ] # 'pkl' or 'csv'
        self.data_root_dir      = hft_dataset_fetcher_params['data_root_dir'   ] # 'hft data'
        self.backup_interval    = hft_dataset_fetcher_params['backup_interval' ] # in min
        self.coins              = hft_dataset_fetcher_params['coins'           ] # like ['BTC', 'ETH', ...]
        self.stable_coin        = hft_dataset_fetcher_params['stable_coin'     ] # 'USTD'
        self.stream_freq        = hft_dataset_fetcher_params['stream_freq'     ] # like 100 (in ms)

        self.hft_recorder       = HFT_Sample_Recorder(hft_dataset_fetcher_params)
        self.pair_list          = [c + self.stable_coin for c in self.coins]
        
        self.thread             = threading.Thread(target = self.__observing_loop, args = ())
        self.thread_is_running  = False
        
        self._init_dirs()
        
        assert self.file_extension in ['pkl', 'csv']
        assert self.backup_interval >= 1                   # At least one minute
        assert type(self.backup_interval) == type(int(1))  # Only whole minutes

        
    def _init_dirs(self):
        if os.path.exists(self.data_root_dir) == False:
            os.mkdir(self.data_root_dir)
            
        self.backup_dir_path = self.data_root_dir + '/' + 'raw fetched data'
        
        
        if os.path.exists(self.backup_dir_path ) == False:
            os.mkdir(self.backup_dir_path)
            
        start_time           = str(pd.to_datetime(time.time()*10**9))[:16] # --> '2023-06-15 10:49'
        bu_dir_name          = f'since_{start_time}'.replace(':', '꞉')
        
        self.backup_dir_path = self.backup_dir_path + '/' + bu_dir_name
        
        if os.path.exists(self.backup_dir_path ) == True:
            for root, dirs, files in os.walk(self.backup_dir_path):
                if len(files) != 0:
                    raise Exception(f'Fetching directory {self.backup_dir_path} already in use. Wait a minute')
        else:
            os.mkdir(self.backup_dir_path)
       
    
    def __del__(self):
        """ deleting HFT_Dataset_Fetcher properly.
            Clears also recorder and fetcher threads.
            
            'del dataset_saver' wont work. 
            Jupyter does call __del__() on overwriting class variables.
        """
        self.thread_is_running = True
        self.stop_saving()
        for root, dirs, files in os.walk(self.backup_dir_path):
            if len(files) == 0:
                os.rmdir(self.backup_dir_path)
        self.hft_recorder.__del__()    
     
    
    def start_saving(self):
        """ Opens a new thread for recording with __observing_loop() """
        assert self.thread_is_running == False
        self.thread_is_running = True
        self.thread.start()
      
    
    def stop_saving(self):
        """ Stops a current thread and the fetching of data """
        assert self.thread_is_running == True
        self.thread_is_running = False # stops function __recording_loop; finishes the thread
        self.thread.join() 
        self.hft_recorder.stop_recording()
        self.hft_recorder.clear_data()
            
            
    def __observing_loop(self):  # backup saves, end_date, instances_to_fetch, backup_time!
        """ Function worked by the thread of the class """
        
        """ Does intervalled backup the data_recorder data and clears its data to save RAM
        
              (exactly slotted intervals: '10:49' --> '11:00' --> '11:30' --> '11:43')
                                          (start)   (consecutive fetching)     (stop)  
        """  
        self.hft_recorder.start_recording()
        one_minute_time = 60*10**9
        start_time      = time.time()
        
        self.backup_count  = 0
        self.bu_start_date = str(pd.to_datetime(start_time*10**9))[:16] # --> '2023-06-01-10꞉49'
        self.bu_end_date   = str(pd.to_datetime(start_time*10**9 + \
                                                self.backup_interval*one_minute_time + \
                                                int(int(self.bu_start_date[14:16]) % self.backup_interval)*one_minute_time)
                                )[:16]                                  # --> '2023-06-01-11꞉00'
        
        print(f'Initialized dataset fetching for {len(self.coins)} pairs at {self.bu_start_date} UTC')
        print(f'{self.backup_interval}-minutely backups expected starting at {self.bu_end_date} UTC')
        
        while self.thread_is_running:
            
            stop_time = pd.to_datetime(pd.to_datetime(self.bu_end_date).value) 
            backup_interval_end_reached = (time.time()*10**9 > stop_time.value)
            if backup_interval_end_reached == True:
                self.backup_count += 1
                self.backup_save_all()
                self.bu_start_date = self.bu_end_date
                self.bu_end_date   = str(pd.to_datetime(pd.to_datetime(self.bu_start_date).value + \
                                                        one_minute_time*self.backup_interval))[:16]                    
            time.sleep(0.010)

        self.backup_count += 1
        self.backup_save_all()

        print(f'Thread finished for HFT_Dataset_Fetcher')
        
    
    def backup_save_all(self): 
        """ Save a dataset for each ticker observed at every end of the backup interval. """
            
        data = self.hft_recorder.get_data()
        self.hft_recorder.clear_data() # free Memory
        col_names  = ['Request Time',
                      'Bid_1', 'BidV_1', 'Bid_2', 'BidV_2', 'Bid_3', 'BidV_3', 'Bid_4', 'BidV_4', 'Bid_5', 'BidV_5',
                      'Ask_1', 'AskV_1', 'Ask_2', 'AskV_2', 'Ask_3', 'AskV_3', 'Ask_4', 'AskV_4', 'Ask_5', 'AskV_5']
        for c in self.coins:
            backup_fn      = f'{c}_{self.stable_coin}_{int(self.stream_freq)}ms_' + \
                             f'({self.bu_start_date})_({self.backup_count}).{self.file_extension}'\
                            .replace(':', '꞉')
            backup_fn_path = self.backup_dir_path + '/' + backup_fn 
            data_key = c + self.stable_coin # 'BTCUSDT'
            df = pd.DataFrame(data[data_key], columns = col_names) 
            df.set_index('Request Time', inplace=True)
            if self.file_extension   == 'csv':
                df.to_csv(   backup_fn_path) 
            elif self.file_extension == 'pkl':
                df.to_pickle(backup_fn_path)
            else:
                raise Exception

### Initiate Continuous live fetching:

In [10]:
 coins = ['ADA','AGIX','ALGO','APT','ARB','ARPA','ATOM','AXS','BNB','BCH','BTC','CFX','DASH','DOGE','DOT','EOS','EDU',
          'ETH','ETC','FET','FIL','FTM','LDO','NEO','ONT', 'OP', 'PEPE','KAVA','LINK','LTC','MASK','MATIC','SAND','SHIB',
          'SOL','STX','SUI','TRX','UNI','VET','WAVES','XMR','XRP','ZEC'] 

params = {'file_extension' : 'csv',
          'data_root_dir'  : 'hft data',
          'backup_interval': 1, # in min
          'coins'          : coins,
          'stable_coin'    : 'USDT',
          'stream_freq'    : 100, # in ms
          'num_of_entries' : 5
         }

dataset_fetcher = HFT_Dataset_Fetcher(params)
dataset_fetcher.start_saving()

Initialized dataset fetching for 44 pairs at 2023-06-18 15:04 UTC
1-minutely backups expected starting at 2023-06-18 15:05 UTC
time after coin ADA: 0.008999824523925781
time after coin AGIX: 0.01992177963256836
time after coin ALGO: 0.025922060012817383
time after coin APT: 0.03592181205749512
time after coin ARB: 0.04538130760192871
time after coin ARPA: 0.05137920379638672
time after coin ATOM: 0.06041121482849121
time after coin AXS: 0.06541180610656738
time after coin BNB: 0.07741332054138184
time after coin BCH: 0.0844111442565918
time after coin BTC: 0.09641122817993164
time after coin CFX: 0.10241127014160156
time after coin DASH: 0.10841178894042969
time after coin DOGE: 0.16941165924072266
time after coin DOT: 0.17841148376464844
time after coin EOS: 0.18641233444213867
time after coin EDU: 0.1964108943939209
time after coin ETH: 0.2094106674194336
time after coin ETC: 0.21641159057617188
time after coin FET: 0.22341156005859375
time after coin FIL: 0.23141098022460938
time af

In [48]:
dataset_fetcher.__del__()

##### TODO:                         

        - When HFT data is needed:
        - Move over to AWS
            - EC2 2 GiB instance
            - Bucket
        - Start fetching on AWS
        
        
        - AWS sources:
                - https://www.bing.com/videos/search?q=aws+sagemaker+studio&&view=detail&mid=938501A6D4EE09EEFACB938501A6D4EE09EEFACB&&FORM=VRDGAR&ru=%2Fvideos%2Fsearch%3Fq%3Daws%2Bsagemaker%2Bstudio%26FORM%3DHDRSC4
                
                - Setup EC2 instances for Jupyter inside AWS
                - Each instance has its own notebooks
                -https://www.bing.com/videos/search?&q=aws+sagemaker+studio&view=detail&mid=C6A3CC0856C44720117EC6A3CC0856C44720117E&FORM=VDRVRV&ru=%2Fvideos%2Fsearch%3Fq%3Daws%2Bsagemaker%2Bstudio%26FORM%3DHDRSC4&ajaxhist=0
            - test minimal freq: 50 ms
            
            
            
           -