In [12]:
import os
from binance import Client
from datetime import timedelta,datetime
import pandas as pd
from tqdm import tqdm
from kafka.producer import MyProducer
import json
import asyncio

In [7]:
START_DATE = "1 jan 2017" # after successful fetching, change to END_DATE
END_DATE = datetime.today().strftime("%d %b %Y")
END_DATE #24 July 2023

'24 Jul 2023'

In [None]:
end_date = datetime.today()
start_date = datetime.strptime(START_DATE, "%d %b %Y").date()

In [None]:
intervals = [
    ("1m",1000 * 60),    
    ("3m",1000 * 60 * 3),
    ("5m",1000 * 60 * 5),
    ("15m",1000 * 60 * 15),
    ("30m",1000 * 60 * 30),
    ("1h",1000 * 60 * 60 * 1),    
    ("2h",1000 * 60 * 60 * 2),
    ("4h",1000 * 60 * 60* 4),
    ("6h",1000 * 60 * 60 * 6),
    ("8h",1000 * 60 * 60 * 8),
    ("12h",1000 * 60 * 60 * 12),
    ("1d",1000 * 60 * 60 * 8),    
    ("3d",1000 * 60 * 60 * 24 * 3),
    ("1w",1000 * 60 * 60 * 24 * 3),
]
symbols = ["BTCUSDT"]


In [24]:
API_KEY = os.environ.get('BINANCE_API_KEY')
SECRET_KEY = os.environ.get('BINANCE_SECRET_KEY')

In [None]:
TO_KAFKA = True
KAFKA_PARTITIONER_SEPERATOR = "|"
KAFKA_TOPIC_PREFIX = "raw"

In [10]:
TO_CSV = True
BASE_DIR = "./../../../data/_raw"
OVERWRITE = True
MAX_BATCH_SIZE = 256
ROWS_PER_FILE = MAX_BATCH_SIZE * 100

In [14]:
client = Client(api_key=API_KEY,api_secret=SECRET_KEY)

In [15]:
def count_intervals(start_time, end_time, ms):
    interval_delta = timedelta(milliseconds=ms)
    current_time = start_time
    count = 0
    while current_time <= end_time:
        current_time += interval_delta
        count += 1
    return count


In [22]:
def create_csv_file(base_dir,symbol,interval,file_idx,data,override=False): 
    file_name = f"RAW_{symbol}_{interval}_{file_idx}.csv"
    file_path = os.path.join(base_dir,symbol,interval,file_name)
    if not os.path.exists(file_path) or (override is True and os.path.exists(file_path)):
        df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'])
        # df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.to_csv(file_path, index=False)
        

In [None]:
def _make_topic(symbol:str,interval:str,prefix:str="raw"):
    topic = f'{prefix}_{symbol.lower()}_{interval.lower()}'

In [13]:
async def data_to_csv(data,symbol,interval,total):
    os.makedirs(os.path.join(BASE_DIR), exist_ok=True)
    os.makedirs(os.path.join(BASE_DIR,symbol), exist_ok=True)
    os.makedirs(os.path.join(BASE_DIR,symbol,interval), exist_ok=True) 
    num_rows_per_file = ROWS_PER_FILE
    file_rows = []
    file_num = 0
    for idx,item in tqdm(enumerate(data),total=total):
        file_rows.append(item)
        if idx != 0 and idx % num_rows_per_file == 0:
            create_csv_file(base_dir=BASE_DIR,symbol=symbol,interval=interval,file_idx=file_num,data=file_rows,override=OVERWRITE)
            file_rows = []
            file_num += 1
        await asyncio.sleep(0.1)

    if len(file_rows) > 0:
        create_csv_file(base_dir=BASE_DIR,symbol=symbol,interval=interval,file_idx=file_num,data=file_rows,override=OVERWRITE)

In [None]:

# replication-factor 3 — partitions 3
# Hash(Key) % Number of partitions -> Partition number
# kafka-console-producer.bat — topic atm1 — bootstrap-server localhost:9092 — property “parse.key=true” — property “key.separator=:”
class TimestampPartitioner(object):
    def __init__(self,seperator = '|') -> None:
        self.seperator = seperator
        
    def __call__(self, topic, key, partitions, *args, **kwargs):
        timestamp_key = key.decode('utf-8')
        timestamp = int(timestamp_key.split(self.seperator)[0])
        num_partitions = len(partitions)
        return timestamp % num_partitions
    
custom_partitioner = TimestampPartitioner(KAFKA_PARTITIONER_SEPERATOR)

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'partitioner': custom_partitioner,
}


In [25]:


async def data_to_kafka(data,symbol,interval,approx_total,prefix=KAFKA_TOPIC_PREFIX,seperator=KAFKA_PARTITIONER_SEPERATOR):
    producer = MyProducer(config=producer_config)
    for idx,item in tqdm(enumerate(data),total=approx_total):
        timestamp = item["E"] # event timestamp
        item_key = f"{timestamp}{seperator}{symbol}_{interval}_{idx}"
        key = item_key.encode('utf-8')
        producer.produce(json.dumps(item),key=key,topic=_make_topic(symbol,interval,prefix))
        await asyncio.sleep(0.1)
    
async def fetch_historical_data(prefix=KAFKA_TOPIC_PREFIX,seperator=KAFKA_PARTITIONER_SEPERATOR):
    for symbol in symbols:
        for interval in intervals:
            _interval = interval[0]
            
            approx = interval[1]
            approx_total = count_intervals(start_date,end_date,approx)
            data = client.get_historical_klines_generator(
                symbol=symbol,
                interval=_interval,
                start_str=START_DATE,
                end_str=END_DATE,
            )
            if TO_CSV:
                print("csv - interval: ", _interval)
                asyncio.create_task(data_to_csv(data,symbol,_interval,approx_total))

            if TO_KAFKA:
                print("kafka - interval: ", _interval)
                asyncio.create_task(data_to_kafka(data,symbol,_interval,approx_total,prefix,seperator))

interval:  1m
3444481


 12%|█▏        | 427001/3444481 [05:22<37:40, 1335.09it/s] 

In [None]:
try:
    asyncio.run(fetch_historical_data())
except Exception as e:
    print(e)