In [43]:
import requests
import zipfile
import polars as pl
import io
import gc
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import List

In [44]:
class DataLoader:
    """GÃ¨re le tÃ©lÃ©chargement des donnÃ©es Binance"""
    
    def __init__(self, base_url: str, period: str = "monthly",
                 target: str = "aggTrades", data_dir: str = "./data"):
        self.base_url = base_url
        self.period = period
        self.target = target
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(exist_ok=True)
        
        # Fichier JSON pour tracker les tÃ©lÃ©chargements
        self.tracker_file = self.data_dir / "downloaded_files.json"
        self.downloaded = self._load_tracker()
    
    def _load_tracker(self) -> dict:
        """Charge le fichier JSON de tracking"""
        if self.tracker_file.exists():
            with open(self.tracker_file, 'r') as f:
                return json.load(f)
        return {}
    """
    def _save_tracker(self):
        with open(self.tracker_file, 'a') as f:
            json.dump(self.downloaded, f, indent=2)
    """
    
    def _save_tracker(self):
        """Sauvegarde le fichier JSON de tracking"""
        # Charger les donnÃ©es existantes
        if os.path.exists(self.tracker_file):
            with open(self.tracker_file, 'r') as f:
                data = json.load(f)
        else:
            data = {}
        
        # Fusionner avec les nouvelles donnÃ©es
        data.update(self.downloaded)
        
        # Sauvegarder (mode 'w' pour Ã©craser avec la version fusionnÃ©e)
        with open(self.tracker_file, 'w') as f:
            json.dump(data, f, indent=2)
    
    def _mark_downloaded(self, symbol: str, date: datetime):
        """Marque un fichier comme tÃ©lÃ©chargÃ©"""
        key = f"{symbol}_{date.year}-{date.month:02d}"
        self.downloaded[key] = {
            'symbol': symbol,
            'year': date.year,
            'month': date.month,
            'date' : date.day,
            'period': self.period,
            'downloaded_at': datetime.now().isoformat()
        }
        self._save_tracker()
    
    def _is_downloaded(self, symbol: str, date: datetime) -> bool:
        """VÃ©rifie si un fichier a dÃ©jÃ  Ã©tÃ© tÃ©lÃ©chargÃ©"""
        key = f"{symbol}_{date.year}-{date.month:02d}"
        return key in self.downloaded
    
    def _get_filename(self, symbol: str, date: datetime) -> str:
        """Construit le nom de fichier selon le period"""
        if self.period == "monthly":
            return f'{symbol}-{self.target}-{date.year}-{date.month:02d}.csv'
        elif self.period == "daily":
            return f'{symbol}-{self.target}-{date.year}-{date.month:02d}-{date.day:02d}.csv'
        else:
            raise ValueError(f"Period invalide: {self.period}")
    
    def _get_url(self, symbol: str, date: datetime) -> str:
        """Construit l'URL de tÃ©lÃ©chargement"""
        filename_base = self._get_filename(symbol, date).replace('.csv', '.zip')
        return f"{self.base_url}/{self.period}/{self.target}/{symbol}/{filename_base}"
    
    def _download(self, url: str) -> bytes | None:
        """TÃ©lÃ©charge un fichier ZIP"""
        try:
            r = requests.get(url, timeout=30)
            return r.content if r.status_code == 200 else None
        except (requests.RequestException, TimeoutError):
            return None
    
    def _unzip(self, content: bytes, symbol: str, date: datetime) -> Path | None:
        """DÃ©compresse un fichier ZIP et retourne le chemin du CSV"""
        try:
            with zipfile.ZipFile(io.BytesIO(content)) as z:
                z.extractall(self.data_dir)
            
            csv_file = self.data_dir / self._get_filename(symbol, date)
            return csv_file if csv_file.exists() else None
        except Exception as e:
            print(f"Erreur dÃ©compression: {e}")
            return None
    
    def load(self, symbol: str, date: datetime) -> Path | None:
        """Charge un fichier (check tracker â†’ check file â†’ download â†’ unzip)"""
        
        # 1. Check si dÃ©jÃ  tÃ©lÃ©chargÃ© selon le tracker
        if self._is_downloaded(symbol, date):
            # VÃ©rifier quand mÃªme si le fichier physique existe
            csv_file = self.data_dir / self._get_filename(symbol, date)
            if csv_file.exists():
                return csv_file
            # Si le fichier n'existe plus mais est dans le tracker, on continue le tÃ©lÃ©chargement
        
        # 2. Download
        url = self._get_url(symbol, date)
        content = self._download(url)
        if not content:
            return None
        
        # 3. Unzip
        csv_file = self._unzip(content, symbol, date)
        
        # 4. Marquer comme tÃ©lÃ©chargÃ©
        if csv_file:
            self._mark_downloaded(symbol, date)
        
        return csv_file
    
    

class BarMethods:
    
    def __init__(self, df: pl.DataFrame,
                 bar_params: dict = {'type': 'time', 'params': '5m'}):
        self.df_raw = df
        self.bar_params = bar_params
        self._time_bar()

    def _time_bar(self):
        self.df_agg = (
            self.df_raw
            .group_by_dynamic('timestamp', every=self.bar_params.get('params', '5m'))
            .agg([
                pl.col('price').first().alias('open'),
                pl.col('price').max().alias('high'),
                pl.col('price').min().alias('low'),
                pl.col('price').last().alias('close'),
                pl.col('quantity').sum().alias('volume'),
                pl.col('buy_volume').sum().alias('buy_volume'),
                pl.col('sell_volume').sum().alias('sell_volume'),
            ])
            .with_columns([
                (pl.col('buy_volume') - pl.col('sell_volume')).alias('delta'),
                (pl.col('buy_volume') + pl.col('sell_volume')).alias('total_volume')
            ])
            .with_columns([
                (pl.col('delta') / pl.col('total_volume')).alias('imbalance'),
                pl.col('delta').cum_sum().alias('cvd')
            ])
        )

class DataProcessor:
    """Traite, agrÃ¨ge et sauvegarde les donnÃ©es"""
    
    def __init__(self, data_dir: str,
                 period: str,
                 bar_params: dict = {'type': 'time', 'params': '5m'}):
        self.data_dir = Path(data_dir)
        self.period = period
        self.data_dir.mkdir(exist_ok=True)
        self.bar_params = bar_params
    
    def _preprocess(self, df: pl.DataFrame) -> pl.DataFrame:
        """PrÃ©pare les donnÃ©es brutes"""
        return df.with_columns([
            pl.from_epoch(pl.col('transact_time'), time_unit='ms').alias('timestamp'),
            pl.col('price').cast(pl.Float64),
            pl.col('quantity').cast(pl.Float64),
            pl.when(~pl.col('is_buyer_maker')).then(pl.col('quantity')).otherwise(0).alias('buy_volume'),
            pl.when(pl.col('is_buyer_maker')).then(pl.col('quantity')).otherwise(0).alias('sell_volume'),
            (pl.col('price') * pl.col('quantity')).alias('dollar_volume')
        ]).with_columns([
            pl.col('quantity').cum_sum().alias('cumulative_volume'),
            pl.col('dollar_volume').cum_sum().alias('cumulative_dollar')
        ])
    
    def _clear(self, filepath: Path, *objects):
        """LibÃ¨re la mÃ©moire et supprime le CSV"""
        # Supprimer le CSV source
        if filepath.exists():
            filepath.unlink()
        
        # LibÃ©rer les objets
        for obj in objects:
            del obj
        gc.collect()
        
    
    def execute(self, filepath: Path, symbol: str, date: datetime):
        """Pipeline complet: read â†’ preprocess â†’ aggregate â†’ save â†’ clear"""
        
        # Check header
        with open(filepath, 'r') as f:
            first_line = f.readline()
        has_header = 'agg_trade_id' in first_line or 'price' in first_line
        
        # Read
        if has_header:
            df = pl.read_csv(filepath, has_header=True)
        else:
            df = pl.read_csv(filepath,
                         has_header=False,
                         new_columns=['agg_trade_id', 'price', 'quantity',
                                      'first_trade_id', 'last_trade_id',
                                      'transact_time', 'is_buyer_maker']
                         )
        
        # Preprocess
        df_prep = self._preprocess(df)
        
        # Aggregate
        bar_methods = BarMethods(df_prep, bar_params=self.bar_params)
        df_agg = bar_methods.df_agg
        
        # Save
        bar_str = self.bar_params.get('params', '5m')
        if self.period == "monthly":
            filename = self.data_dir / f"{symbol}_{bar_str}_{date.year}-{date.month:02d}.parquet"
            df_agg.write_parquet(filename)
            print(f"â†’ {filename.name} ({len(df_agg)} barres)")
            self._clear(filepath, df, df_prep, df_agg, bar_methods)
        
        elif self.period == "daily":
            filename = self.data_dir / f"{symbol}_{bar_str}_{date.year}-{date.month:02d}-{date.day:02d}.parquet"
            df_agg.write_parquet(filename)
            print(f"â†’ {filename.name} ({len(df_agg)} barres)")
            self._clear(filepath, df, df_prep, df_agg, bar_methods)
            
        
        
        
        

def run_one(symbol, date, data_dir, period="monthly", bar_params={'type': 'time', 'params': '5m'}):
    loader = DataLoader(base_url="https://data.binance.vision/data/futures/um", 
                    period=period, target="aggTrades", data_dir=data_dir)
    csv_file = loader.load(symbol=symbol, date=date)
    if csv_file:    
        print(csv_file)
        processor = DataProcessor(data_dir=data_dir, period=period, bar_params=bar_params)
        processor.execute(filepath=csv_file, symbol=symbol, date=date)
    else:
        print("âœ— Fichier non disponible")

    

In [45]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_one_wrapper(args):
    """Wrapper pour run_one avec unpacking des arguments"""
    symbol, date, data_dir, period, bar_params = args
    try:
        run_one(symbol=symbol, date=date, data_dir=data_dir,
                period=period, bar_params=bar_params)
        return (symbol, date, True)
    except Exception as e:
        print(f"âœ— Erreur {symbol} {date.year}-{date.month:02d}: {e}")
        return (symbol, date, False)

def run_parallel(symbols: List[str], dates: List[datetime], 
                data_dir: str = "./data",
                period: str = "monthly",
                bar_params: dict = {'type': 'time', 'params': '5m'},
                max_workers: int = 4):
    
    # CrÃ©er toutes les combinaisons (symbol, date)
    tasks = [(symbol, date, data_dir, period, bar_params) for symbol in symbols for date in dates]
    print(f"ðŸš€ DÃ©marrage: {len(tasks)} tÃ¢ches avec {max_workers} workers\n")
    
    completed = 0
    failed = 0
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(run_one_wrapper, task): task for task in tasks}
        
        for future in as_completed(futures):
            symbol, date, success = future.result()
            completed += 1
            if not success:
                failed += 1
            print(f"[{completed}/{len(tasks)}] ComplÃ©tÃ©")
    print(f"\nâœ… TerminÃ©: {completed-failed}/{completed} succÃ¨s")



from enum import Enum

class Period(str, Enum):
    DAILY = "daily"
    MONTHLY = "monthly"


In [46]:
"""symbol = 'BTCUSDT'
date = datetime(2021, 1, 1)
loader = DataLoader(base_url="https://data.binance.vision/data/futures/um", 
                    period=Period.DAILY.value, target="aggTrades", data_dir="./data")
csv_file = loader.load(symbol=symbol, date=date)"""

'symbol = \'BTCUSDT\'\ndate = datetime(2021, 1, 1)\nloader = DataLoader(base_url="https://data.binance.vision/data/futures/um", \n                    period=Period.DAILY.value, target="aggTrades", data_dir="./data")\ncsv_file = loader.load(symbol=symbol, date=date)'

In [None]:
from datetime import datetime, timedelta
from typing import List

def generate_dates(start_date: datetime, end_date: datetime, period: Period) -> List[datetime]:
    dates = []
    if period == Period.MONTHLY:
        year = start_date.year
        while year <= end_date.year:
            start_month = 1 if year > start_date.year else start_date.month
            end_month = 12 if year < end_date.year else end_date.month
            
            for month in range(start_month, end_month + 1):
                dates.append(datetime(year, month, 1))
            year += 1
    
    elif period == Period.DAILY:
        current_date = start_date
        while current_date <= end_date:
            dates.append(current_date)
            current_date += timedelta(days=1)
    return dates


# Utilisation
symbols = ['BTCUSDT']
START = datetime(2024, 1, 1)
END = datetime.now()
dates = generate_dates(start_date=START, end_date=END, period=Period.MONTHLY)
print(f"ðŸ“Š Total: {len(dates)} fichiers Ã  tÃ©lÃ©charger")

ðŸ“Š Total: 25 fichiers Ã  tÃ©lÃ©charger


: 

In [None]:
run_parallel(symbols, dates,
             period=Period.MONTHLY.value,
             data_dir="./data", 
             max_workers=10)

ðŸš€ DÃ©marrage: 25 tÃ¢ches avec 10 workers

data\BTCUSDT-aggTrades-2024-06.csv
â†’ BTCUSDT_5m_2024-06.parquet (8640 barres)
[1/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-09.csv
â†’ BTCUSDT_5m_2024-09.parquet (8640 barres)
[2/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-02.csv
â†’ BTCUSDT_5m_2024-02.parquet (8352 barres)
[3/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-01.csv
â†’ BTCUSDT_5m_2024-01.parquet (8928 barres)
[4/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-08.csv
â†’ BTCUSDT_5m_2024-08.parquet (8928 barres)
[5/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-12.csv
â†’ BTCUSDT_5m_2024-12.parquet (8928 barres)
[6/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-05.csv
â†’ BTCUSDT_5m_2024-05.parquet (8928 barres)
[7/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-11.csv
â†’ BTCUSDT_5m_2024-11.parquet (8640 barres)
[8/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-07.csv
â†’ BTCUSDT_5m_2024-07.parquet (8928 barres)
[9/25] ComplÃ©tÃ©
data\BTCUSDT-aggTrades-2024-10.csv
â†’ BTCUSDT_5m_2024-10.parquet (8926 b

In [None]:
e

In [None]:
import os
import polars as pl

def concat_tick_data(symbol: str, interval: str, years: list = [2021, 2025], 
                     save: bool = True, output_dir: str = "data") -> pl.DataFrame:
    dataframes = []
    for year in years:
        for month in range(1, 13):
            for day in range(1, 32):
                filename = f"{symbol}_{interval}_{year}-{month:02d}-{day:02d}.parquet"
                file = os.path.join(output_dir, filename)
                try:
                    df = pl.read_parquet(file)
                    if df.shape[0] < 288:
                        print(f" shape error : {filename} - {df.shape[0]}")
                    dataframes.append(df)
                except:
                    print(filename)
                    pass
        
    if dataframes:
        data = pl.concat(dataframes)
        print(f"{symbol}_{interval}: {len(data):,} rows")
        
        if save:
            output_file = os.path.join(output_dir, f"{symbol}_{interval}_combined.parquet")
            data.write_parquet(output_file)
        
        return data
    else:
        return pl.dataframe()




data = {}
symbols_gaps = {}
SYMBOLS = ['BTCUSDT', 'ETHUSDT', 'TRXUSDT']
SYMBOLS = ['BTCUSDT']
for symbol in SYMBOLS:
    df = concat_tick_data(symbol, "5m")
    gaps = df.select(['timestamp',
                      pl.col('timestamp').diff().alias('time_diff')
                      ]).filter(
                          pl.col('time_diff') > pl.duration(minutes=5)
                          )
    
    symbols_gaps[symbol] = gaps['timestamp'].to_list()

In [None]:
symbols_gaps

In [None]:
pl.read_parquet("data/BTCUSDT_5m_2025-11-15.parquet")