Este notebook construye la base de datos operativa del proyecto a partir de tick data de Dukascopy (2021–2025). Primero descarga y consolida los ticks, luego limita el universo al horario líquido (07:00–22:00 UTC) y transforma los datos a velas OHLC de 5 segundos. El resultado se guarda en formato parquet para facilitar lecturas rápidas en las etapas posteriores. Además, incluye una auditoría básica de calidad que revisa continuidad temporal, duplicados, coherencia OHLC y comportamiento del spread, generando un reporte de incidencias cuando se detectan anomalías.

In [None]:
# Descarga EURUSD ticks de Dukascopy para 2025,
# filtra UTC 07:00–22:00, resamplea a velas de 5s y guarda Parquet por mes.

import datetime as dt
import lzma
import struct
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import polars as pl
import requests
from tqdm import tqdm


# CONFIGURACIÓN GENERAL

BASE_URL = "https://datafeed.dukascopy.com/datafeed"
SYMBOL = "EURUSD"
PRICE_SCALE = 100000          # EURUSD
MAX_WORKERS = 12              # threads
OUT_DIR = Path("./data") / SYMBOL

# Ventana operativa UTC
UTC_START = dt.time(7, 0, 0)
UTC_END = dt.time(22, 0, 0)

# BI5 record format
REC = struct.Struct(">iii ff")
REC_SIZE = REC.size



# UTILIDADES

def dukascopy_hour_url(symbol: str, hour_start: dt.datetime) -> str:
    return (
        f"{BASE_URL}/{symbol}/"
        f"{hour_start.year:04d}/{hour_start.month:02d}/{hour_start.day:02d}/"
        f"{hour_start.hour:02d}h_ticks.bi5"
    )


def download_bytes(url: str, timeout: int = 20, retries: int = 2) -> bytes | None:
    for _ in range(retries + 1):
        try:
            r = requests.get(url, timeout=timeout)
            if r.status_code == 200 and r.content:
                return r.content
            return None
        except requests.RequestException:
            continue
    return None


def decode_bi5(bi5: bytes, hour_start: dt.datetime) -> pl.DataFrame:
    raw = lzma.decompress(bi5)
    raw = raw[: (len(raw) // REC_SIZE) * REC_SIZE]

    if not raw:
        return pl.DataFrame(schema={
            "ts": pl.Datetime("ms"),
            "bid": pl.Float64,
            "ask": pl.Float64,
            "bid_vol": pl.Float32,
            "ask_vol": pl.Float32,
        })

    ts, bid, ask, bidv, askv = [], [], [], [], []

    for off in range(0, len(raw), REC_SIZE):
        time_ms, ask_i, bid_i, ask_v, bid_v = REC.unpack_from(raw, off)
        ts.append(hour_start + dt.timedelta(milliseconds=int(time_ms)))
        bid.append(bid_i / PRICE_SCALE)
        ask.append(ask_i / PRICE_SCALE)
        bidv.append(float(bid_v))
        askv.append(float(ask_v))

    return pl.DataFrame({
        "ts": ts,
        "bid": bid,
        "ask": ask,
        "bid_vol": bidv,
        "ask_vol": askv,
    }).with_columns(pl.col("ts").cast(pl.Datetime("ms")))


def filter_utc_window(df: pl.DataFrame) -> pl.DataFrame:
    if df.height == 0:
        return df

    t = pl.col("ts").dt.time()
    return df.filter(
        (t >= pl.time(UTC_START.hour, UTC_START.minute, 0)) &
        (t <  pl.time(UTC_END.hour, UTC_END.minute, 0))
    )


def resample_5s(df_ticks: pl.DataFrame) -> pl.DataFrame:
    if df_ticks.height == 0:
        return pl.DataFrame(schema={
            "ts": pl.Datetime("ms"),
            "open": pl.Float64,
            "high": pl.Float64,
            "low": pl.Float64,
            "close": pl.Float64,
            "ticks": pl.UInt32,
            "bid_last": pl.Float64,
            "ask_last": pl.Float64,
            "bid_vol_sum": pl.Float64,
            "ask_vol_sum": pl.Float64,
        })

    df = (
        df_ticks.sort("ts")
        .with_columns(mid=((pl.col("bid") + pl.col("ask")) / 2))
    )

    return (
        df.group_by_dynamic(
            index_column="ts",
            every="5s",
            period="5s",
            closed="left",
            label="left",
        )
        .agg(
            pl.col("mid").first().alias("open"),
            pl.col("mid").max().alias("high"),
            pl.col("mid").min().alias("low"),
            pl.col("mid").last().alias("close"),
            pl.len().cast(pl.UInt32).alias("ticks"),
            pl.col("bid").last().alias("bid_last"),
            pl.col("ask").last().alias("ask_last"),
            pl.col("bid_vol").sum().alias("bid_vol_sum"),
            pl.col("ask_vol").sum().alias("ask_vol_sum"),
        )
        .drop_nulls(["open", "high", "low", "close"])
    )


def iter_hours(start: dt.datetime, end: dt.datetime):
    cur = start
    while cur <= end:
        yield cur
        cur += dt.timedelta(hours=1)



# PIPELINE PRINCIPAL

def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    year = 2025
    start = dt.datetime(year, 1, 1, 0, 0, 0)
    end = dt.datetime(year, 12, 31, 23, 0, 0)
    hours = list(iter_hours(start, end))

    def process_hour(hour_start: dt.datetime) -> tuple[str, pl.DataFrame]:
        url = dukascopy_hour_url(SYMBOL, hour_start)
        bi5 = download_bytes(url)
        if not bi5:
            return "", pl.DataFrame()

        ticks = decode_bi5(bi5, hour_start)
        ticks = filter_utc_window(ticks)
        candles = resample_5s(ticks)

        month_key = f"{hour_start.year:04d}-{hour_start.month:02d}"
        return month_key, candles

    buffers: dict[str, list[pl.DataFrame]] = {}

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futures = [ex.submit(process_hour, h) for h in hours]

        for f in tqdm(as_completed(futures), total=len(futures), desc="EURUSD 2025"):
            month_key, dfh = f.result()
            if dfh.height:
                buffers.setdefault(month_key, []).append(dfh)

    for month, parts in buffers.items():
        df_month = (
            pl.concat(parts, how="vertical")
            .sort("ts")
            .unique(subset=["ts"], keep="last")
        )

        out_path = OUT_DIR / f"{SYMBOL}_5s_{month}.parquet"
        df_month.write_parquet(out_path)
        print(f"✔ {out_path} | velas: {df_month.height}")

    print("Descarga 2025 completada.")


if __name__ == "__main__":
    # pip install polars requests tqdm
    main()

EURUSD 2025: 100%|██████████| 8760/8760 [34:02<00:00,  4.29it/s]   


✔ data/EURUSD/EURUSD_5s_2025-01.parquet | velas: 192390
✔ data/EURUSD/EURUSD_5s_2025-02.parquet | velas: 194288
✔ data/EURUSD/EURUSD_5s_2025-03.parquet | velas: 216961
✔ data/EURUSD/EURUSD_5s_2025-04.parquet | velas: 207462
✔ data/EURUSD/EURUSD_5s_2025-05.parquet | velas: 194502
✔ data/EURUSD/EURUSD_5s_2025-06.parquet | velas: 189713
✔ data/EURUSD/EURUSD_5s_2025-07.parquet | velas: 179480
✔ data/EURUSD/EURUSD_5s_2025-08.parquet | velas: 191991
✔ data/EURUSD/EURUSD_5s_2025-09.parquet | velas: 190988
✔ data/EURUSD/EURUSD_5s_2025-10.parquet | velas: 171912
✔ data/EURUSD/EURUSD_5s_2025-11.parquet | velas: 184106
Descarga 2025 completada.


In [None]:
# Control de calidad para velas 5s EURUSD (UTC fijo)
# Reviso gaps, duplicados, OHLC, spread y actividad

from pathlib import Path
import polars as pl

DATA_DIR = Path("./data/EURUSD")
PIP = 0.0001  # EURUSD


def check_month(file: Path) -> dict:
    df = pl.read_parquet(file).sort("ts")

    # Spread
    df = df.with_columns(
        spread=(pl.col("ask_last") - pl.col("bid_last")),
        spread_pips=(pl.col("ask_last") - pl.col("bid_last")) / PIP,
    )

    # Checks
    out = {}
    out["file"] = file.name
    out["rows"] = df.height

    # 1) Duplicados
    out["dup_ts"] = (df.height - df.select(pl.col("ts").n_unique()).item())

    # 2) Gaps
    gaps = df.select(
        pl.col("ts").diff().dt.total_seconds().fill_null(5).alias("dt_s")
    )
    out["gaps_gt_5s"] = gaps.select((pl.col("dt_s") > 5).sum()).item()
    out["max_gap_s"] = gaps.select(pl.col("dt_s").max()).item()

    # 3) OHLC inválido
    out["invalid_ohlc"] = df.filter(
        (pl.col("high") < pl.col("low")) |
        (pl.col("open") < pl.col("low")) | (pl.col("open") > pl.col("high")) |
        (pl.col("close") < pl.col("low")) | (pl.col("close") > pl.col("high"))
    ).height

    # 4) Spread
    out["spread_neg"] = df.filter(pl.col("spread") < 0).height
    out["spread_zero"] = df.filter(pl.col("spread") == 0).height

    spread_stats = df.select([
        pl.col("spread_pips").median().alias("median_pips"),
        pl.col("spread_pips").quantile(0.95).alias("p95_pips"),
        pl.col("spread_pips").max().alias("max_pips"),
    ]).row(0)

    out["spread_median"] = spread_stats[0]
    out["spread_p95"] = spread_stats[1]
    out["spread_max"] = spread_stats[2]

    # 5) Actividad
    ticks_stats = df.select([
        pl.col("ticks").min().alias("ticks_min"),
        pl.col("ticks").median().alias("ticks_med"),
        pl.col("ticks").quantile(0.95).alias("ticks_p95"),
        pl.col("ticks").max().alias("ticks_max"),
    ]).row(0)

    out["ticks_min"] = ticks_stats[0]
    out["ticks_med"] = ticks_stats[1]
    out["ticks_p95"] = ticks_stats[2]
    out["ticks_max"] = ticks_stats[3]

    return out


def main():
    files = sorted(DATA_DIR.glob("EURUSD_5s_2025-*.parquet"))

    if not files:
        print("❌ No se encontraron archivos parquet.")
        return

    results = []
    for f in files:
        res = check_month(f)
        results.append(res)

    df_res = pl.DataFrame(results)

    print("\n=== RESUMEN POR MES ===")
    print(df_res)

    print("\n=== CHEQUEOS GLOBALES ===")
    print("Meses con duplicados:",
          df_res.filter(pl.col("dup_ts") > 0).select("file").to_series().to_list())

    print("Meses con OHLC inválido:",
          df_res.filter(pl.col("invalid_ohlc") > 0).select("file").to_series().to_list())

    print("Meses con spread negativo:",
          df_res.filter(pl.col("spread_neg") > 0).select("file").to_series().to_list())

    print("\n=== SPREAD GLOBAL ===")
    print(
        df_res.select([
            pl.col("spread_median").mean().alias("avg_median"),
            pl.col("spread_p95").mean().alias("avg_p95"),
            pl.col("spread_max").max().alias("max_seen"),
        ])
    )

    print("\n=== ACTIVIDAD (ticks/vela) ===")
    print(
        df_res.select([
            pl.col("ticks_med").mean().alias("avg_ticks_med"),
            pl.col("ticks_p95").mean().alias("avg_ticks_p95"),
            pl.col("ticks_max").max().alias("max_ticks_seen"),
        ])
    )

    print("\n✅ Quality check finalizado.")


if __name__ == "__main__":
    main()


=== RESUMEN POR MES ===
shape: (11, 15)
┌───────────────┬────────┬────────┬────────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ file          ┆ rows   ┆ dup_ts ┆ gaps_gt_5s ┆ … ┆ ticks_min ┆ ticks_med ┆ ticks_p95 ┆ ticks_max │
│ ---           ┆ ---    ┆ ---    ┆ ---        ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ str           ┆ i64    ┆ i64    ┆ i64        ┆   ┆ i64       ┆ f64       ┆ f64       ┆ i64       │
╞═══════════════╪════════╪════════╪════════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ EURUSD_5s_202 ┆ 192390 ┆ 0      ┆ 15091      ┆ … ┆ 1         ┆ 6.0       ┆ 20.0      ┆ 73        │
│ 5-01.parquet  ┆        ┆        ┆            ┆   ┆           ┆           ┆           ┆           │
│ EURUSD_5s_202 ┆ 194288 ┆ 0      ┆ 11800      ┆ … ┆ 1         ┆ 7.0       ┆ 22.0      ┆ 71        │
│ 5-02.parquet  ┆        ┆        ┆            ┆   ┆           ┆           ┆           ┆           │
│ EURUSD_5s_202 ┆ 216961 ┆ 0      ┆ 10491      ┆ …

In [21]:
from pathlib import Path
import polars as pl

DATA_DIR = Path("./data/EURUSD")
PIP = 0.0001

files = sorted(DATA_DIR.glob("EURUSD_5s_2025-*.parquet"))

rows = []
for f in files:
    df = pl.read_parquet(f).with_columns(
        spread_pips=(pl.col("ask_last") - pl.col("bid_last")) / PIP
    )
    top = (
        df.select(["ts", "bid_last", "ask_last", "ticks", "spread_pips"])
          .sort("spread_pips", descending=True)
          .head(10)
          .with_columns(pl.lit(f.name).alias("file"))
    )
    rows.append(top)

out = pl.concat(rows).sort("spread_pips", descending=True)

print("=== TOP 50 SPREAD BARS (2025) ===")
print(out.head(50))

print("\n=== PEOR CASO ===")
print(out.head(1))

=== TOP 50 SPREAD BARS (2025) ===
shape: (50, 6)
┌─────────────────────┬──────────┬──────────┬───────┬─────────────┬───────────────────────────┐
│ ts                  ┆ bid_last ┆ ask_last ┆ ticks ┆ spread_pips ┆ file                      │
│ ---                 ┆ ---      ┆ ---      ┆ ---   ┆ ---         ┆ ---                       │
│ datetime[ms]        ┆ f64      ┆ f64      ┆ u32   ┆ f64         ┆ str                       │
╞═════════════════════╪══════════╪══════════╪═══════╪═════════════╪═══════════════════════════╡
│ 2025-02-20 21:04:00 ┆ 1.084    ┆ 1.08714  ┆ 1     ┆ 31.4        ┆ EURUSD_5s_2025-02.parquet │
│ 2025-11-25 07:47:50 ┆ 1.17663  ┆ 1.179    ┆ 1     ┆ 23.7        ┆ EURUSD_5s_2025-11.parquet │
│ 2025-11-25 07:43:35 ┆ 1.17701  ┆ 1.1793   ┆ 2     ┆ 22.9        ┆ EURUSD_5s_2025-11.parquet │
│ 2025-11-25 07:43:40 ┆ 1.17701  ┆ 1.17929  ┆ 1     ┆ 22.8        ┆ EURUSD_5s_2025-11.parquet │
│ 2025-11-25 07:43:50 ┆ 1.177    ┆ 1.17928  ┆ 2     ┆ 22.8        ┆ EURUSD_5s_2025-11.p