#### Download agg trades

In [1]:
%load_ext autoreload
%autoreload 2

from datetime import datetime, timedelta, UTC
from time import sleep
import json
from binance.client import Client
import polars as pl

from jaref_bot.utils.files import get_saved_coins

client = Client()

In [2]:
# Добавить фильтрацию по времени
# Добавить разделение на цену покупки и цену продажи

def add_json_to_df(symbol, df, truncate='1s'):
    output_filename = f"./data/agg_trades/{symbol}_data.jsonl"
    new_df = pl.scan_ndjson(output_filename).with_columns([
        pl.col("ts").cast(pl.Datetime("ms")).alias("datetime"),
        pl.col("price").cast(pl.Float64),
        pl.col("qty").cast(pl.Float64),
        pl.col('buyer'),
        pl.col('best_price')
    ]).drop("ts"
    )
        
    if truncate:
        new_df = new_df.group_by(pl.col("datetime").dt.truncate(truncate)
        ).agg([
            pl.col("price").last().alias("close"),
            pl.col("qty").sum()
        ]).sort("datetime"
        )

    if df.height > 0:
        return pl.concat([df, new_df.collect()]).unique().sort(by='datetime')
    else:
        return new_df.collect()

In [5]:
saved_coins = get_saved_coins(data_folder='./data/agg_trades')

In [6]:
print(saved_coins)

['ADA', 'AEVO', 'AI', 'ALGO', 'APE', 'AXS', 'BLUR', 'BTC', 'C98', 'CFX', 'CHZ', 'DYM', 'FIL', 'FLOW', 'GMT', 'GRT', 'ICX', 'INJ', 'IOTA', 'KAVA', 'KDA', 'MANTA', 'MINA', 'NEAR', 'OP', 'ORDI', 'PIXEL', 'SAND', 'SNX', 'SSV', 'STORJ', 'SUSHI', 'THETA', 'TIA', 'TON', 'VANRY', 'XTZ', 'ZK']


In [8]:
days_to_download = (datetime.now() - datetime(2025, 1, 1)).days

for symbol in ('STRK', 'XAI'):
    output_filename = f"./data/{symbol}_data.jsonl"
    
    if symbol in saved_coins:
        main_df = pl.read_parquet(f'./data/agg_trades/{symbol}_agg_trades.parquet')
        start_dt = main_df['datetime'].last()
    else:
        main_df = pl.DataFrame()
        start_dt = datetime.now(UTC) - timedelta(days=days_to_download)
        
    start_ms = int(start_dt.timestamp() * 1000)   
    end_dt = datetime.now(UTC)
    end_ms = int(end_dt.timestamp() * 1000)
    
    while start_ms < end_ms:
        t = datetime.fromtimestamp(start_ms // 1000)
        
        try:
            trades = client.get_aggregate_trades(symbol=symbol+'USDT', startTime=start_ms, limit=1000)
        except Exception as e:
            ct = datetime.now().strftime('%H:%M:%S')
            print(f"{ct} Ошибка при запросе с startTime={t.strftime('%Y-%m-%d %H:%M:%S')}: {e}")
            sleep(5)
            continue
    
        # Если сделок нет, переходим вперед (например, увеличив start_ms на 1 минуту)
        if not trades:
            print(f"Нет сделок с startTime={t.strftime('%Y-%m-%d %H:%M:%S')}, пропускаем интервал.")
            start_ms += 60000  # пропускаем 60 секунд
            continue
    
        # Сохраняем полученные сделки в файл (JSON Lines)
        with open(output_filename, "a", encoding="utf-8") as f:
            for trade in trades:
                f.write(json.dumps({'ts': trade['T'],
                                    'price': trade['p'], 
                                    'qty': trade['q'], 
                                    'buyer': trade['m'],
                                    'best_price': trade['M']}, ensure_ascii=False) + "\n")
        
        print(f"Текущая дата: {t.strftime('%Y-%m-%d %H:%M:%S')}, token: {symbol}     ", end='\r')
    
        # Обновляем start_ms: берём timestamp последней сделки и прибавляем 1 мс,
        # чтобы не получить дублирование
        start_ms = trades[-1]['T'] + 1
    
        # Небольшая задержка для соблюдения лимитов API
        sleep(0.2)

    main_df = add_json_to_parquet(symbol=symbol, df=main_df)
    main_df.write_parquet(f'./data/agg_trades/{symbol}_agg_trades.parquet')

Текущая дата: 2025-08-10 18:39:00, token: XAI      

In [40]:
def get_df_from_trades(trades):
    df = pl.DataFrame(trades).with_columns([
        pl.col("p").cast(pl.Float64).alias("price"),
        pl.col("q").cast(pl.Float64).alias("quantity"),
        (pl.col("T") // 1000).cast(pl.Int64).alias("ts")
    ])
    
    # === Лучшие bid ===
    best_bids = (
        df.filter(pl.col("m") == True)
          .group_by("ts")
          .agg(pl.col("price").max().alias("bid_price"))
    )
    
    bid_volumes = (
        df.filter(pl.col("m") == True)
          .group_by(["ts", "price"])
          .agg(pl.col("quantity").sum().alias("bid_volume"))
    )
    
    best_bids = best_bids.join(
        bid_volumes,
        left_on=["ts", "bid_price"],
        right_on=["ts", "price"]
    )
    
    # === Лучшие ask ===
    best_asks = (
        df.filter(pl.col("m") == False)
          .group_by("ts")
          .agg(pl.col("price").min().alias("ask_price"))
    )
    
    ask_volumes = (
        df.filter(pl.col("m") == False)
          .group_by(["ts", "price"])
          .agg(pl.col("quantity").sum().alias("ask_volume"))
    )
    
    best_asks = best_asks.join(
        ask_volumes,
        left_on=["ts", "ask_price"],
        right_on=["ts", "price"]
    )
    
    # === Объединение ===
    return best_bids.join(best_asks, on="ts", how="full", coalesce=True
                        ).sort("ts"
                        ).with_columns(pl.from_epoch("ts", time_unit="s"
                        ).alias("time")
                        ).select('time', 'bid_price', 'bid_volume', 'ask_price', 'ask_volume')

In [42]:
get_df_from_trades(trades).tail(10)

time,bid_price,bid_volume,ask_price,ask_volume
datetime[μs],f64,f64,f64,f64
2025-08-10 16:28:02,0.0537,6851.4,,
2025-08-10 16:28:10,,,0.0538,185.8
2025-08-10 16:28:11,0.0537,980.1,,
2025-08-10 16:28:13,,,0.0538,118.0
2025-08-10 16:28:51,,,0.0537,2438.9
2025-08-10 16:29:13,,,0.0538,118.0
2025-08-10 16:30:00,0.0537,4457.5,,
2025-08-10 16:30:07,,,0.0537,2893.2
2025-08-10 16:30:13,,,0.0538,118.0
2025-08-10 16:30:20,0.0537,5841.7,,


In [44]:
pl.DataFrame(trades).with_columns([
        pl.col("p").cast(pl.Float64).alias("price"),
        pl.col("q").cast(pl.Float64).alias("quantity"),
        (pl.col("T") // 1000).cast(pl.Int64).alias("ts")
    ]).tail(10)

a,p,q,f,l,T,m,M,price,quantity,ts
i64,str,str,i64,i64,i64,bool,bool,f64,f64,i64
33832567,"""0.05370000""","""6851.40000000""",57550115,57550117,1754843282113,True,True,0.0537,6851.4,1754843282
33832568,"""0.05380000""","""185.80000000""",57550118,57550118,1754843290374,False,True,0.0538,185.8,1754843290
33832569,"""0.05370000""","""980.10000000""",57550119,57550119,1754843291709,True,True,0.0537,980.1,1754843291
33832570,"""0.05380000""","""118.00000000""",57550120,57550120,1754843293179,False,True,0.0538,118.0,1754843293
33832571,"""0.05370000""","""2438.90000000""",57550121,57550121,1754843331163,False,True,0.0537,2438.9,1754843331
33832572,"""0.05380000""","""118.00000000""",57550122,57550122,1754843353518,False,True,0.0538,118.0,1754843353
33832573,"""0.05370000""","""4457.50000000""",57550123,57550124,1754843400293,True,True,0.0537,4457.5,1754843400
33832574,"""0.05370000""","""2893.20000000""",57550125,57550126,1754843407182,False,True,0.0537,2893.2,1754843407
33832575,"""0.05380000""","""118.00000000""",57550127,57550127,1754843413547,False,True,0.0538,118.0,1754843413
33832576,"""0.05370000""","""5841.70000000""",57550128,57550128,1754843420780,True,True,0.0537,5841.7,1754843420


In [10]:
pl.read_parquet(f'./data/agg_trades/{symbol}_agg_trades.parquet')

datetime,close,qty
datetime[ms],f64,f64
2024-12-31 15:54:37,0.2345,57746.4
2024-12-31 15:54:42,0.2344,34.6
2024-12-31 15:55:17,0.2344,97.0
2024-12-31 15:55:21,0.2343,48.7
2024-12-31 15:55:31,0.2343,629.6
…,…,…
2025-08-10 16:29:13,0.0538,118.0
2025-08-10 16:30:00,0.0537,4457.5
2025-08-10 16:30:07,0.0537,2893.2
2025-08-10 16:30:13,0.0538,118.0


In [None]:
symbol = 'BTC'

main_df = pl.DataFrame()
main_df = add_json_to_parquet(symbol=symbol, df=main_df, truncate=False)
main_df.write_parquet(f'./data/{symbol}_agg_trades.parquet')

#### Download candles

In [None]:
from jaref_bot.data.http_api import ExchangeManager, BybitRestAPI
import pandas as pd

In [None]:
async def get_data(symbol, interval, n_iters):
    df = await exc_manager.get_candles(symbol=symbol, interval=interval, n_iters=n_iters)
    df = df[0].sort_index()
    return df.dropna()

In [None]:
exc_manager = ExchangeManager()
exc_manager.add_market("bybit_linear", BybitRestAPI('linear'))

In [None]:
res = await get_data(symbol='PAXG_USDT', interval='1m', n_iters=1000)

In [None]:
res.to_parquet('./data/paxg_1m_data.parquet')

In [None]:
res

In [None]:
df = pl.read_parquet('./data/paxg_1m_data.parquet')

In [None]:
df = df.group_by(pl.col("Date").dt.truncate('1d')
    ).agg(
        pl.col("Open").first().alias('Open'),
        pl.col("High").max().alias('High'),
        pl.col("Low").min().alias('Low'),
        pl.col("Close").last().alias('Close'),
        pl.col("Volume").sum().alias('Volume'),
    ).sort(by='Date')

In [None]:
import altair as alt

In [None]:
alt.Chart(df).mark_line().encode(
    x=alt.X('Date:T', title=''),
    y=alt.Y('Close:Q', title='usdt')
).properties(title='BTC_USDT', width=960, height=150)