In [None]:
import random
import multiprocessing
import time

import pandas as pd
from tqdm import tqdm
import ccxt

from utils import get_ohlcv_filename, symbols

In [None]:
exchange = ccxt.binance()

In [None]:
# lock for printing without text overlapping in a single line in multiprocessing
lock = multiprocessing.Lock()

def print_with_lock(text):
    with lock:
        print(text)

In [None]:
def log_error(e):
    """
    Print a formatted error message to the console
    """
    module = e.__module__ if hasattr(e, "__module__") else ""
    print(f'''
    {e.__class__=}
    {module=}
    {e.args=}
    {e.__context__=}
    ''')

# TODO check if this works without this position variable
position = 0

def get_symbol_data(symbol, timeframe, since = 0):
    
    time.sleep(1)

    columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
    
    global position
    pbar = tqdm(desc = f"Downloading {symbol:>8}", position=position)
    position += 1
    
    candles = []

    while True:

        try:
            candles_new = exchange.fetch_ohlcv(symbol=symbol, timeframe=timeframe, since=since, limit=1500)
        except Exception as e:
            # this catches ddos protection exceptions. sleep random time and continue.
            time.sleep(random.randrange(60))
            log_error(e)
            continue
        
        if len(candles_new) == 0:
            break
        
        candles += candles_new

        since = candles[-1][0] + 1
        
        pbar.update(len(candles_new))
    
    pbar.close()
        
    return pd.DataFrame(candles, columns=columns)
    

In [None]:
def download_or_update_symbol_data(symbol, timeframe = "1m"):
    
    filename = get_ohlcv_filename(symbol, timeframe)
    
    try:
        
        df = pd.read_pickle(filename)
        
        # file found, fetch new data, concat, save to file and return df
        print_with_lock(f"File {filename} found. Updating...")
        since = df["timestamp"].iloc[-1] + 1
        
        df_new = get_symbol_data(symbol, timeframe, since)
        if df_new.shape[0] > 0:
            df = pd.concat([df, df_new], ignore_index = True)
            df.to_pickle(filename)
        
    except FileNotFoundError:
        
        # file not found, download, save to file, and return df
        print(f"File {filename} not found. Downloading...")
        df = get_symbol_data(symbol, timeframe)
        df.to_pickle(filename)
        
    return df

In [None]:
with multiprocessing.Pool() as pool:
    for _ in pool.imap_unordered(download_or_update_symbol_data, symbols):
        pass