In [2]:
# binance_data.py
from __future__ import annotations

import os
from pathlib import Path
from typing import Optional, Union, List, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timezone
import time
import hashlib
import hmac
from urllib.parse import urlencode
import pandas as pd
from dotenv import load_dotenv

try:
    from binance.spot import Spot as SpotClient
    from binance.um_futures import UMFutures as FuturesClient
    _USING_CONNECTOR = True
except ImportError:
    SpotClient = None  # type: ignore
    FuturesClient = None  # type: ignore
    import requests  # type: ignore

    _USING_CONNECTOR = False
    

Number = Union[int, float]
TsLike = Union[int, float, str, datetime]

try:
    _BASE_PATH = Path(__file__).resolve().parent
except NameError:        # running inside a notebook / REPL
    _BASE_PATH = Path.cwd()

_ENV_DEFAULT_PATH = _BASE_PATH / ".env"



def _load_api_credentials(
    api_key: Optional[str], api_secret: Optional[str]
) -> tuple[Optional[str], Optional[str]]:
    """
    Load Binance API credentials from arguments or environment variables.

    Environment variables are populated from the nearest .env file if present.
    Keys passed explicitly to BinanceData override the environment.
    """
    if api_key and api_secret:
        return api_key, api_secret

    load_dotenv(
        dotenv_path=_ENV_DEFAULT_PATH if _ENV_DEFAULT_PATH.exists() else None,
        override=False,
    )

    return (
        api_key or os.getenv("BINANCE_API_KEY"),
        api_secret or os.getenv("BINANCE_API_SECRET"),
    )


def _to_millis(ts: Optional[TsLike]) -> Optional[int]:
    """Convert ISO string / datetime / seconds-ms int/float to Binance ms timestamp."""
    if ts is None:
        return None
    if isinstance(ts, (int, float)):
        # Heuristic: treat >= 10^12 as already ms
        return int(ts if ts >= 1_000_000_000_000 else ts * 1000)
    if isinstance(ts, datetime):
        if ts.tzinfo is None:
            ts = ts.replace(tzinfo=timezone.utc)
        return int(ts.timestamp() * 1000)
    return int(pd.to_datetime(ts, utc=True).value // 1_000_000)


def _df_klines(raw: List[List[Any]]) -> pd.DataFrame:
    cols = [
        "open_time", "open", "high", "low", "close", "volume", "close_time",
        "quote_asset_volume", "number_of_trades", "taker_base_volume",
        "taker_quote_volume", "ignore"
    ]
    df = pd.DataFrame(raw, columns=cols)
    num_cols = [
        "open", "high", "low", "close", "volume", "quote_asset_volume",
        "taker_base_volume", "taker_quote_volume"
    ]
    df[num_cols] = df[num_cols].apply(pd.to_numeric, errors="coerce")
    df["open_time"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
    df["close_time"] = pd.to_datetime(df["close_time"], unit="ms", utc=True)
    return df


def _retry(call, *, tries=3, backoff=0.8):
    """Tiny retry helper for transient HTTP 429/5xx."""
    for i in range(tries):
        try:
            return call()
        except Exception:
            if i == tries - 1:
                raise
            time.sleep(backoff * (2 ** i))


@dataclass
class BinanceData:
    api_key: Optional[str] = None
    api_secret: Optional[str] = None
    # Use the lower-load CDN for public Spot data by default:
    spot_base_url: str = "https://data-api.binance.vision"

    def __post_init__(self):
        # Fill missing credentials from environment/.env if needed.
        self.api_key, self.api_secret = _load_api_credentials(self.api_key, self.api_secret)

        self._using_connector = _USING_CONNECTOR

        if self._using_connector:
            self.spot_public = SpotClient(base_url=self.spot_base_url)
            self.spot_auth = (
                SpotClient(api_key=self.api_key, api_secret=self.api_secret)
                if self.api_key and self.api_secret else None
            )
            self.um = FuturesClient(api_key=self.api_key, api_secret=self.api_secret)
        else:
            # HTTP fallback using requests for environments without binance-connector.
            self._http_user_agent = "binance-data/0.1"
            self._spot_http_base = f"{self.spot_base_url.rstrip('/')}/api"
            self._futures_http_base = "https://fapi.binance.com"

            self._spot_session = requests.Session()
            self._futures_session = requests.Session()

            self._spot_session.headers.update({"User-Agent": self._http_user_agent})
            self._futures_session.headers.update({"User-Agent": self._http_user_agent})

            if self.api_key:
                # Set API key header for endpoints that support/require it.
                self._spot_session.headers["X-MBX-APIKEY"] = self.api_key
                self._futures_session.headers["X-MBX-APIKEY"] = self.api_key

            # Expose placeholders to keep attribute names consistent with connector branch.
            self.spot_public = None
            self.spot_auth = None
            self.um = None

    # -------- HTTP helpers for fallback --------
    def _http_spot_get(self, path: str, params: Dict[str, Any]) -> Any:
        if self._using_connector:
            raise RuntimeError("HTTP helper should not be used when connector is available")
        clean_params = {k: v for k, v in params.items() if v is not None}
        url = f"{self._spot_http_base.rstrip('/')}/{path.lstrip('/')}"
        response = self._spot_session.get(url, params=clean_params, timeout=10)
        response.raise_for_status()
        return response.json()

    def _http_futures_get(self, path: str, params: Dict[str, Any], *, signed: bool = False) -> Any:
        if self._using_connector:
            raise RuntimeError("HTTP helper should not be used when connector is available")
        clean_params = {k: v for k, v in params.items() if v is not None}
        if signed:
            clean_params = self._sign_params(clean_params)
        url = f"{self._futures_http_base.rstrip('/')}/{path.lstrip('/')}"
        response = self._futures_session.get(url, params=clean_params, timeout=10)
        response.raise_for_status()
        return response.json()

    def _sign_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        if not self.api_secret:
            raise RuntimeError("Binance API secret required for signed endpoint")
        payload = dict(params)
        payload["timestamp"] = int(time.time() * 1000)
        query = urlencode(sorted(payload.items()), doseq=True)
        signature = hmac.new(self.api_secret.encode("utf-8"), query.encode("utf-8"), hashlib.sha256).hexdigest()
        payload["signature"] = signature
        return payload

    # ---------- Spot ----------
    def spot_klines(
        self,
        symbol: str,
        interval: str = "1h",
        start: Optional[TsLike] = None,
        end: Optional[TsLike] = None,
        limit: int = 1000,
    ) -> pd.DataFrame:
        params = dict(symbol=symbol.upper(), interval=interval, limit=limit)
        st = _to_millis(start); et = _to_millis(end)
        if st:
            params["startTime"] = st
        if et:
            params["endTime"] = et

        if self._using_connector:
            raw = _retry(lambda: self.spot_public.klines(**params))
        else:
            raw = _retry(lambda: self._http_spot_get("v3/klines", params))
        return _df_klines(raw)

    def spot_depth(self, symbol: str, limit: int = 1000) -> Dict[str, Any]:
        symbol = symbol.upper()
        if self._using_connector:
            return _retry(lambda: self.spot_public.depth(symbol=symbol, limit=limit))
        return _retry(lambda: self._http_spot_get("v3/depth", {"symbol": symbol, "limit": limit}))

    def spot_trades(self, symbol: str, limit: int = 1000) -> pd.DataFrame:
        symbol = symbol.upper()
        if self._using_connector:
            raw = _retry(lambda: self.spot_public.trades(symbol=symbol, limit=limit))
        else:
            raw = _retry(lambda: self._http_spot_get("v3/trades", {"symbol": symbol, "limit": limit}))
        df = pd.DataFrame(raw)
        if not df.empty:
            df["price"] = pd.to_numeric(df["price"], errors="coerce")
            if "quantity" in df.columns:
                df.rename(columns={"quantity": "qty"}, inplace=True)
            if "T" in df.columns and "time" not in df.columns:
                df.rename(columns={"T": "time"}, inplace=True)
            if "qty" in df.columns:
                df["qty"] = pd.to_numeric(df["qty"], errors="coerce")
            if "time" in df.columns:
                df["time"] = pd.to_datetime(df["time"], unit="ms", utc=True)
        return df

    def spot_book_ticker(self, symbol: str) -> Dict[str, Any]:
        symbol = symbol.upper()
        if self._using_connector:
            return _retry(lambda: self.spot_public.book_ticker(symbol=symbol))
        return _retry(lambda: self._http_spot_get("v3/ticker/bookTicker", {"symbol": symbol}))

    # ---------- USDⓈ-M Futures ----------
    def fut_klines(
        self,
        symbol: str,
        interval: str = "1h",
        start: Optional[TsLike] = None,
        end: Optional[TsLike] = None,
        limit: int = 1500,
    ) -> pd.DataFrame:
        params = dict(symbol=symbol.upper(), interval=interval, limit=limit)
        st = _to_millis(start); et = _to_millis(end)
        if st:
            params["startTime"] = st
        if et:
            params["endTime"] = et

        if self._using_connector:
            raw = _retry(lambda: self.um.klines(**params))
        else:
            raw = _retry(lambda: self._http_futures_get("fapi/v1/klines", params))
        return _df_klines(raw)

    def fut_mark_price(self, symbol: str) -> Dict[str, Any]:
        symbol = symbol.upper()
        if self._using_connector:
            return _retry(lambda: self.um.mark_price(symbol=symbol))
        return _retry(lambda: self._http_futures_get("fapi/v1/premiumIndex", {"symbol": symbol}))

    def fut_funding_history(
        self,
        symbol: str,
        start: Optional[TsLike] = None,
        end: Optional[TsLike] = None,
        limit: int = 1000,
    ) -> pd.DataFrame:
        params = dict(symbol=symbol.upper(), limit=limit)
        st = _to_millis(start); et = _to_millis(end)
        if st:
            params["startTime"] = st
        if et:
            params["endTime"] = et

        if self._using_connector:
            raw = _retry(lambda: self.um.funding_rate(**params))
        else:
            raw = _retry(lambda: self._http_futures_get("fapi/v1/fundingRate", params))
        df = pd.DataFrame(raw)
        if not df.empty:
            df["fundingRate"] = pd.to_numeric(df["fundingRate"], errors="coerce")
            df["fundingTime"] = pd.to_datetime(df["fundingTime"], unit="ms", utc=True)
        return df

    def fut_index_price_klines(
        self,
        symbol: str,
        interval: str = "1h",
        start: Optional[TsLike] = None,
        end: Optional[TsLike] = None,
        limit: int = 720,
    ) -> pd.DataFrame:
        params = dict(symbol=symbol.upper(), interval=interval, limit=limit)
        st = _to_millis(start); et = _to_millis(end)
        if st:
            params["startTime"] = st
        if et:
            params["endTime"] = et

        if self._using_connector:
            raw = _retry(lambda: self.um.index_price_klines(**params))
        else:
            http_params = dict(params)
            http_params["pair"] = http_params.pop("symbol")
            raw = _retry(lambda: self._http_futures_get("fapi/v1/indexPriceKlines", http_params))
        return _df_klines(raw)

    def fut_premium_index_klines(
        self,
        symbol: str,
        interval: str = "1h",
        start: Optional[TsLike] = None,
        end: Optional[TsLike] = None,
        limit: int = 720,
    ) -> pd.DataFrame:
        params = dict(symbol=symbol.upper(), interval=interval, limit=limit)
        st = _to_millis(start); et = _to_millis(end)
        if st:
            params["startTime"] = st
        if et:
            params["endTime"] = et

        if self._using_connector:
            raw = _retry(lambda: self.um.premium_index_klines(**params))
        else:
            raw = _retry(lambda: self._http_futures_get("fapi/v1/premiumIndexKlines", params))
        return _df_klines(raw)

    def fut_open_interest_hist(
        self,
        symbol: str,
        period: str = "5m",
        limit: int = 500,
        start: Optional[TsLike] = None,
        end: Optional[TsLike] = None,
    ) -> pd.DataFrame:
        params = dict(symbol=symbol.upper(), period=period, limit=limit)
        st = _to_millis(start); et = _to_millis(end)
        if st:
            params["startTime"] = st
        if et:
            params["endTime"] = et

        if self._using_connector:
            raw = _retry(lambda: self.um.open_interest_hist(**params))
        else:
            target_start = params.pop("startTime", None)
            target_end = params.pop("endTime", None)
            target_limit = limit or 500

            batch_size = 500  # maximise coverage; slice later to honour limit
            aggregated: List[Dict[str, Any]] = []
            next_end = target_end
            last_earliest: Optional[int] = None

            while True:
                def _call(ne=next_end):
                    batch_params = {
                        "symbol": params["symbol"],
                        "period": params["period"],
                        "limit": batch_size,
                    }
                    if ne is not None:
                        batch_params["endTime"] = ne
                    return self._http_futures_get("futures/data/openInterestHist", batch_params)

                batch = _retry(_call)
                if not batch:
                    break

                aggregated = batch + aggregated if aggregated else batch
                earliest = aggregated[0]["timestamp"]

                if target_start is not None and earliest <= target_start:
                    break
                if len(batch) < batch_size:
                    break

                next_candidate = batch[0]["timestamp"] - 1
                if next_end is not None and next_candidate >= next_end:
                    break
                if last_earliest is not None and next_candidate >= last_earliest:
                    break

                last_earliest = batch[0]["timestamp"]
                next_end = next_candidate

                if target_start is None and len(aggregated) >= target_limit:
                    break
                if target_start is not None and len(aggregated) >= target_limit * 2:
                    break

            raw = aggregated
            raw.sort(key=lambda x: x["timestamp"])
            if target_start is not None:
                raw = [row for row in raw if row["timestamp"] >= target_start]
            if target_end is not None:
                raw = [row for row in raw if row["timestamp"] <= target_end]
            if target_limit and len(raw) > target_limit:
                if target_start is not None:
                    raw = raw[:target_limit]
                else:
                    raw = raw[-target_limit:]
        df = pd.DataFrame(raw)
        if not df.empty:
            df["sumOpenInterest"] = pd.to_numeric(df["sumOpenInterest"], errors="coerce")
            df["sumOpenInterestValue"] = pd.to_numeric(df["sumOpenInterestValue"], errors="coerce")
            df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
        return df


# ------------------ Example usage ------------------
if __name__ == "__main__":
    # Keys are pulled from BINANCE_API_KEY / BINANCE_API_SECRET in .env or the environment.
    client = BinanceData()

    spot_1h = client.spot_klines("BTCUSDT", "1h", start="2024-01-01", end=None, limit=1000)
    print("Spot klines:", spot_1h.tail(3))

    ob = client.spot_depth("ETHUSDT", limit=100)
    print("Top of book (ETHUSDT):", {"bid": ob["bids"][0], "ask": ob["asks"][0]})

    fut_1h = client.fut_klines("BTCUSDT", "1h", start="2024-06-01", limit=1500)
    print("Futures klines:", fut_1h.tail(3))

    fund = client.fut_funding_history("BTCUSDT", start="2024-06-01")
    print("Funding samples:", fund.tail(3))

    prem = client.fut_premium_index_klines("BTCUSDT", "1h", start="2024-06-01")
    idx = client.fut_index_price_klines("BTCUSDT", "1h", start="2024-06-01")
    print("Premium idx last:", prem.tail(1)[["open_time", "close"]].to_dict(orient="records"))
    print("Index klines last:", idx.tail(1)[["open_time", "close"]].to_dict(orient="records"))

    oi = client.fut_open_interest_hist("BTCUSDT", period="1h", start="2024-06-01")
    print("Open interest last:", oi.tail(1).to_dict(orient="records"))


Spot klines:                     open_time      open      high       low     close  \
997 2024-02-11 13:00:00+00:00  48291.01  48347.00  48026.53  48058.12   
998 2024-02-11 14:00:00+00:00  48058.12  48245.01  47931.82  48234.56   
999 2024-02-11 15:00:00+00:00  48234.55  48234.56  48028.87  48150.43   

         volume                       close_time  quote_asset_volume  \
997  1774.25568 2024-02-11 13:59:59.999000+00:00        8.542246e+07   
998  1431.34837 2024-02-11 14:59:59.999000+00:00        6.882421e+07   
999   900.56310 2024-02-11 15:59:59.999000+00:00        4.333320e+07   

     number_of_trades  taker_base_volume  taker_quote_volume ignore  
997             67445          828.49281        3.988285e+07      0  
998             68214          809.92200        3.894901e+07      0  
999             43109          418.85973        2.015535e+07      0  
Top of book (ETHUSDT): {'bid': ['4137.11000000', '6.30170000'], 'ask': ['4137.12000000', '23.53020000']}
Futures klines:     