In [2]:
# import sys
# print(sys.executable)
#!/Users/macbook/code/venv/bin/python -m pip install clickhouse_driver

In [3]:
import os

import requests
import json
import pandas as pd
from datetime import datetime, timedelta, UTC, timezone
import clickhouse_connect
from clickhouse_driver import Client
from multiprocessing import Queue
from dotenv import load_dotenv

In [4]:
load_dotenv()

user = str(os.getenv('user'))
password = str(os.getenv('password'))

# EXTRACT

In [5]:
def round_down_to_5_minutes(dt):
    return dt - timedelta(minutes=dt.minute % 5, seconds=dt.second, microseconds=dt.microsecond)

def format_dtms_to_time(ts_ms):
    timestamp_s = int(ts_ms) // 1000  # Перевод миллисекунд в секунды
    # return datetime.fromtimestamp(timestamp_s, UTC).strftime('%Y-%m-%d %H:%M:%S')
    return datetime.fromtimestamp(timestamp_s, timezone.utc) 


In [6]:
requests_queue = Queue()
insert_data_queue = Queue()

In [7]:
def get_kline(symbol, time_str, requests_queue):
    kline_time = datetime.fromisoformat(time_str) - timedelta(minutes=5)
    rounded_dt = round_down_to_5_minutes(kline_time)
    kline_time = int(rounded_dt.timestamp() * 1000)  #start stime

    params = {
        'category': 'spot',
        'symbol': symbol,
        'interval': '5',
        'start': kline_time,
        'limit': 1
    }
    end_point =  '/v5/market/kline'
    url = 'https://api.bybit.com'

    requests_queue.put_nowait(
        requests.get(url + end_point, params).json()
    )

# TRANSFORM

In [8]:
def transform_data(requests_queue):
    insert_data = []
    while True:
        try:
            data = requests_queue.get_nowait()  # Неблокирующий запрос
        except:
            break 

        if data.get('retMsg') == 'OK' and data.get('result').get('list'):
            result = data.get('result').get('list')[0]
            values = (
                data.get('result').get('symbol')[:-4],
                format_dtms_to_time(result[0]),
                float(result[1]),
                float(result[2]),
                float(result[3]),
                float(result[4]),
                float(result[5]),
                float(result[6]),
            )
            insert_data.append(values)
    insert_data_queue.put_nowait(insert_data)

# LOAD

In [9]:
# CREATE TABLE five_min_klines
# (
#     symbol String,
#     start_time DateTime,
#     open Float64,
#     high Float64,
#     low Float64,
#     close Float64,
#     volume_base_coin Float64,
#     volume_usdt Float64
# )
# ENGINE = ReplacingMergeTree()
# ORDER BY (symbol, start_time);

In [17]:
def upload_to_clickhouse(table_name, client):
    try:
        values = insert_data_queue.get_nowait()
        #print(values)
        # Неблокирующий запрос
        client.execute(f'INSERT INTO {table_name} (*) VALUES', values)
    except:
        print('finished')

## таблица настроена на автоматическое удаление дубликатов


In [11]:
# раз в день оптимизировать таблицы
# client.execute('OPTIMIZE TABLE five_min_klines FINAL')

# FULL PIPELINE

In [47]:
requests_queue = Queue()
insert_data_queue = Queue()

time_str = '2025-01-23T23:00:00+00:00'
get_kline('BTCUSDT', time_str, requests_queue)
get_kline('ETHUSDT', time_str, requests_queue)
get_kline('TRXUSDT', time_str, requests_queue)

transform_data(requests_queue)


In [48]:
transform_data(requests_queue)

In [49]:
client = Client(
    host='87.236.22.62',
    user=user,
    password=password,
    database='bybit_history'
)

table_name = 'five_min_klines'
upload_to_clickhouse(table_name, client)

In [14]:
#requests_queue.get_nowait()

In [16]:
# insert_data_queue.get_nowait()