In [18]:
# CÉLULA 1: CONFIGURAÇÃO E FUNÇÃO DE PROCESSAMENTO DIÁRIO

import requests
import pandas as pd
import numpy as np
import io
import zipfile
from datetime import date, timedelta
from tqdm.notebook import tqdm # Para uma barra de progresso visual

# --- Função para processar um único dia ---
def process_day(target_day, symbol="BTCUSDT"):
    """
    Descarrega, processa e agrega os dados de trades para um único dia.
    Retorna um DataFrame agregado em velas de 15 minutos.
    """
    day_str = target_day.strftime('%Y-%m-%d')
    url = f"https://data.binance.vision/data/spot/daily/trades/{symbol}/{symbol}-trades-{day_str}.zip"

    try:
        # 1. Download
        response = requests.get(url, timeout=30 )
        response.raise_for_status()

        # 2. Processamento Manual
        processed_data = []
        column_names = ['id', 'price', 'qty', 'quote_qty', 'time', 'is_buyer_maker', 'is_best_match']

        zip_file = zipfile.ZipFile(io.BytesIO(response.content))
        csv_filename = zip_file.namelist()[0]

        with zip_file.open(csv_filename) as f:
            raw_text_reader = io.TextIOWrapper(f)
            for line in raw_text_reader:
                parts = line.strip().split(',')
                timestamp_str_corrigido = parts[4][:13]
                processed_data.append([
                    int(parts[0]), float(parts[1]), float(parts[2]), float(parts[3]),
                    pd.to_datetime(int(timestamp_str_corrigido), unit='ms'),
                    parts[5].lower() == 'true', parts[6].lower() == 'true'
                ])
        
        # 3. Criação do DataFrame diário
        trades_do_dia = pd.DataFrame(processed_data, columns=column_names)
        trades_do_dia.set_index('time', inplace=True)

        # 4. Agregação diária
        condition_is_buyer = (trades_do_dia['is_buyer_maker'] == False)
        trades_do_dia['buy_volume'] = np.where(condition_is_buyer, trades_do_dia['quote_qty'], 0)
        trades_do_dia['sell_volume'] = np.where(~condition_is_buyer, trades_do_dia['quote_qty'], 0)

        # Corrigido o Future-Warning: 'T' -> 'min'
        agg_df_dia = trades_do_dia.resample('15min').agg({
            'buy_volume': 'sum',
            'sell_volume': 'sum',
            'id': 'count'
        }).rename(columns={'id': 'trade_count'})
        
        return agg_df_dia

    except requests.exceptions.HTTPError:
        # É normal não encontrar dados para dias muito recentes, então não é um erro fatal.
        print(f"Aviso: Dados para o dia {day_str} não encontrados (404). A ignorar.")
        return None
    except Exception as e:
        print(f"Erro ao processar o dia {day_str}: {e}")
        return None

print("Função 'process_day' definida e pronta a ser usada.")


Função 'process_day' definida e pronta a ser usada.


In [19]:
# CÉLULA 2: EXECUÇÃO DO PIPELINE PARA MÚLTIPLOS DIAS

# --- Parâmetros ---
DIAS_PARA_PROCESSAR = 90
symbol = "BTCUSDT"
end_date = date.today()
start_date = end_date - timedelta(days=DIAS_PARA_PROCESSAR)

# --- Loop Principal ---
all_daily_dfs = []
# tqdm cria uma barra de progresso
for day_num in tqdm(range(DIAS_PARA_PROCESSAR), desc="A processar dias"):
    current_date = start_date + timedelta(days=day_num)
    daily_agg_df = process_day(current_date, symbol)
    
    if daily_agg_df is not None:
        all_daily_dfs.append(daily_agg_df)

print(f"\nProcessamento concluído. {len(all_daily_dfs)} dias foram processados com sucesso.")

# --- Concatenação e Cálculo Final ---
if all_daily_dfs:
    # Junta todos os dataframes diários num só
    master_agg_df = pd.concat(all_daily_dfs)
    
    # Calcula a feature final 'buy_ratio' sobre o dataset completo
    total_volume = master_agg_df['buy_volume'] + master_agg_df['sell_volume']
    master_agg_df['buy_ratio'] = (master_agg_df['buy_volume'] / total_volume).where(total_volume > 0, 0.5)
    
    print("\nDataFrame mestre criado e 'buy_ratio' calculado.")
    display(master_agg_df.head())
    display(master_agg_df.tail())
    print(f"Dimensões do DataFrame Mestre: {master_agg_df.shape}")
else:
    print("\nNenhum dado foi processado. O DataFrame mestre não foi criado.")



A processar dias:   0%|          | 0/90 [00:00<?, ?it/s]


Processamento concluído. 90 dias foram processados com sucesso.

DataFrame mestre criado e 'buy_ratio' calculado.


Unnamed: 0_level_0,buy_volume,sell_volume,trade_count,buy_ratio
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2025-06-22 00:00:00,29797560.0,25888400.0,132458,0.5351
2025-06-22 00:15:00,30490170.0,22601680.0,115061,0.574291
2025-06-22 00:30:00,33168830.0,22435030.0,120027,0.59652
2025-06-22 00:45:00,12248400.0,15413250.0,53760,0.442794
2025-06-22 01:00:00,10754870.0,11378780.0,50717,0.485906


Unnamed: 0_level_0,buy_volume,sell_volume,trade_count,buy_ratio
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2025-09-19 22:45:00,1446714.0,1487180.0,6224,0.493104
2025-09-19 23:00:00,1372109.0,2878126.0,7877,0.322831
2025-09-19 23:15:00,1804612.0,2053364.0,10347,0.467761
2025-09-19 23:30:00,1772838.0,2270683.0,8670,0.438439
2025-09-19 23:45:00,3405252.0,2362933.0,8269,0.590351


Dimensões do DataFrame Mestre: (8640, 4)


In [20]:
# CÉLULA 3: SALVAMENTO DOS DADOS AGREGADOS

# Cria o diretório 'user_data' se ele não existir, para manter o projeto organizado
import os
output_dir = 'user_data/data/binance'
os.makedirs(output_dir, exist_ok=True)

# Define o nome do ficheiro de saída
output_filename = os.path.join(output_dir, f'trades_agg_{DIAS_PARA_PROCESSAR}d_{symbol}.parquet')

if 'master_agg_df' in locals() and not master_agg_df.empty:
    try:
        master_agg_df.to_parquet(output_filename)
        print(f"\n--- SUCESSO! ---")
        print(f"DataFrame agregado guardado com sucesso em: {output_filename}")
    except Exception as e:
        print(f"\nErro ao guardar o ficheiro Parquet: {e}")
else:
    print("\nNenhum dado para guardar.")




--- SUCESSO! ---
DataFrame agregado guardado com sucesso em: user_data/data/binance/trades_agg_90d_BTCUSDT.parquet
