In [None]:
import os
import time
import lzma
import struct
import requests
import pandas as pd

from datetime import datetime, timedelta

# Base URL for raw tick data (one .bi5 per instrument/year/month/day/hour)
BASE_URL = "https://datafeed.dukascopy.com/datafeed"
HEADERS = {"User-Agent": "Mozilla/5.0", "Referer": "https://www.dukascopy.com"}

# Map timeframe strings to their length in minutes
TIMEFRAME_MAP = {
    "1 min": 1,
    "5 min": 5,
    "10 min": 10,
    "15 min": 15,
    "30 min": 30,
    "1 hour": 60,
    "4 hour": 240,
    "1 day": 1440
}


def iterate_hours(start: datetime, end: datetime):
    """Yield each hour-aligned datetime between start and end."""
    cursor = start.replace(minute=0, second=0, microsecond=0)
    while cursor <= end:
        yield cursor
        cursor += timedelta(hours=1)


def floor_to_period(dt: datetime, minutes: int) -> datetime:
    """Round-down dt to the nearest multiple of `minutes`."""
    total = dt.hour * 60 + dt.minute
    bucket = (total // minutes) * minutes
    h, m = divmod(bucket, 60)
    return dt.replace(hour=h, minute=m, second=0, microsecond=0)


def download_dukascopy_data(
    instrument: str,
    start_date: datetime,
    end_date: datetime,
    timeframe: str = "1 hour",
    gmt_offset: int = 0
) -> pd.DataFrame:
    """
    Pull raw ticks for each hour, then aggregate into OHLCV bars.
    Returns a DataFrame with columns: timestamp, open, high, low, close, volume.
    """
    if timeframe not in TIMEFRAME_MAP:
        raise ValueError(f"Unsupported timeframe: {timeframe}. Choose from {list(TIMEFRAME_MAP)}")
    period_mins = TIMEFRAME_MAP[timeframe]

    tick_rows = []
    for hr in iterate_hours(start_date, end_date):
        year, month, day, hour = hr.year, hr.month-1, hr.day, hr.hour
        url = (f"{BASE_URL}/{instrument}/"
               f"{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5")
        try:
            r = requests.get(url, headers=HEADERS, timeout=30)
        except Exception:
            # some network hiccup—skip
            continue

        if r.status_code != 200:
            # no market activity (weekend/holiday), or file not found
            continue

        try:
            raw = lzma.decompress(r.content)
        except lzma.LZMAError:
            continue

        # Each tick record is 20 bytes: >I4f  (ms, bid, ask, bidVol, askVol)
        count = len(raw) // 20
        for i in range(count):
            block = raw[i*20:(i+1)*20]
            ms, bid, ask, bvol, avol = struct.unpack(">I4f", block)
            dt = datetime.utcfromtimestamp(ms / 1000.0) + timedelta(hours=gmt_offset)
            if dt < start_date or dt > end_date:
                continue
            price = (bid + ask) / 2.0
            vol   = bvol + avol
            tick_rows.append({"timestamp": dt, "price": price, "volume": vol})

        # be kind to their server
        time.sleep(0.5)

    if not tick_rows:
        return pd.DataFrame(columns=["timestamp","open","high","low","close","volume"])

    # Build DataFrame of ticks
    df_ticks = pd.DataFrame(tick_rows)

    # Assign each tick to its period bucket
    df_ticks["period_start"] = df_ticks["timestamp"].apply(
        lambda dt: floor_to_period(dt, period_mins)
    )

    # Aggregate OHLCV
    agg = df_ticks.groupby("period_start").agg(
        open   = ("price", "first"),
        high   = ("price", "max"),
        low    = ("price", "min"),
        close  = ("price", "last"),
        volume = ("volume", "sum")
    ).reset_index().rename(columns={"period_start": "timestamp"})

    return agg.sort_values("timestamp").reset_index(drop=True)


def main():
    instruments = ["EURUSD", "GBPUSD", "USDJPY", "AUDUSD", "USDCHF"]
    start = datetime(2020, 1,  1)
    end   = datetime(2025, 3,  1)
    outdir = "C:/Users/MELODY/Documents/Forex/ICT/ICT_ml_trading/data/dukascopy"
    os.makedirs(outdir, exist_ok=True)

    for ins in instruments:
        print(f"→ Downloading {ins}")
        df = download_dukascopy_data(ins, start, end, timeframe="1 hour")
        if df.empty:
            print(f"   • No data for {ins}")
        else:
            path = os.path.join(outdir, f"{ins}_1h_UTC.csv")
            df.to_csv(path, index=False)
            print(f"   • Saved {len(df)} bars to {path}")


if __name__ == "__main__":
    main()


→ Downloading EURUSD
