In [0]:
# Databricks Python Notebook: Bronze ingest from Binance Kline REST
import time, json, random, datetime as dt
import requests
from typing import List, Dict, Tuple, Optional
from pyspark.sql import Row
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable

# =========================
# (A) 실행 설정
# =========================
MODE           = "backfill"                   # once | poll | forever | backfill
SYMBOLS        = ["BTCUSDT","ETHUSDT"]      # 원하는 코인 심볼들
INTERVALS      = ["15m"]                      # ["1m","5m"] 등
LIMIT_ONCE     = 1000                           # 호출당 캔들 수(최대 1000)
POLL_SECONDS   = 10                             # poll/forever 간격(초)
MAX_POLLS      = 60                             # poll 모드 반복 횟수
BACKFILL_HOURS = 168                            # 과거 7일 = 168시간

# 업서트 시 ingest_time 갱신 정책:
# - True  : 매 업서트 때 ingest_time을 최신값으로 갱신
# - False : 최초 ingest_time을 보존(이력 보존 성격 강화)
UPSERT_UPDATE_INGEST_TIME = True

# =========================
# (B) 프로젝트 설정
# =========================
CATALOG = "demo_catalog"
SCHEMA  = "demo_schema"
TABLE        = f"{CATALOG}.{SCHEMA}.bronze_charts"       # Bronze Delta
STATE_TABLE  = f"{CATALOG}.{SCHEMA}.bronze_ingest_state" # 마지막 open_time(ms) 상태 저장

# Binance REST (Spot)
BASE_URL = "https://api.binance.com"
KLINES   = "/api/v3/klines"
LIMIT_DEFAULT = 500  # Spot 기본 limit (최대 1000)

# =========================
# (C) 테이블 준비
# =========================
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Bronze 테이블: dt 파티션(날짜) + unique_key 인덱싱 성격
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
  source            STRING,
  event_time        TIMESTAMP,  -- open_time(UTC)
  ingest_time       TIMESTAMP,  -- 적재 시각(UTC)
  unique_key        STRING,     -- "symbol|interval|open_time(ms)"
  raw_json          STRING,     -- 원본 배열 JSON 문자열
  api_endpoint      STRING,     -- 호출한 REST endpoint
  api_params_hash   STRING,     -- 요청 파라미터 해시(디버깅/재현)
  dt                DATE        -- event_date(partition key)
) USING DELTA
PARTITIONED BY (dt)
""")

# 상태 테이블: 심볼/인터벌별 마지막 open_time(ms)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {STATE_TABLE} (
  symbol    STRING,
  interval  STRING,
  last_open_time_ms LONG,
  updated_at TIMESTAMP
) USING DELTA
""")

# =========================
# (D) 유틸리티
# =========================
def _params_hash(params: Dict) -> str:
    """요청 파라미터 dict -> 안정적인 해시 문자열"""
    raw = json.dumps(params, sort_keys=True, separators=(",",":"))
    import hashlib
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

def _to_ms(ts: dt.datetime) -> int:
    """UTC-aware datetime -> epoch milliseconds"""
    if ts.tzinfo is None:
        ts = ts.replace(tzinfo=dt.timezone.utc)
    return int(ts.timestamp() * 1000)

def _from_ms(ms: int) -> dt.datetime:
    """epoch milliseconds -> UTC-aware datetime"""
    return dt.datetime.fromtimestamp(ms/1000, tz=dt.timezone.utc)

def binance_klines(symbol: str, interval: str, start_ms: Optional[int]=None,
                   end_ms: Optional[int]=None, limit: int=LIMIT_DEFAULT,
                   max_retries: int=5) -> Tuple[List[list], Dict[str,str]]:
    """
    Binance Spot /api/v3/klines 호출
    - 429(rate limit) 시 지수 백오프
    - 성공 시 (rows, headers) 반환
    """
    url = BASE_URL + KLINES
    q = {"symbol": symbol, "interval": interval, "limit": limit}
    if start_ms is not None: q["startTime"] = start_ms
    if end_ms   is not None: q["endTime"]   = end_ms

    delay = 1
    last_headers = {}
    for _ in range(max_retries):
        r = requests.get(url, params=q, timeout=30)
        last_headers = {k: v for k, v in r.headers.items()}
        if r.status_code == 200:
            return r.json(), last_headers
        if r.status_code == 429:
            time.sleep(delay + random.uniform(0, 0.3))  # 지터 포함
            delay = min(delay * 2, 16)
            continue
        time.sleep(1 + random.uniform(0, 0.3))          # 기타 오류 재시도
    r.raise_for_status()
    return [], last_headers

def _get_last_state(symbol: str, interval: str) -> Optional[int]:
    """STATE_TABLE에서 마지막 open_time(ms) 조회"""
    df = spark.sql(f"""
      SELECT last_open_time_ms
      FROM {STATE_TABLE}
      WHERE symbol = '{symbol}' AND interval = '{interval}'
      ORDER BY updated_at DESC
      LIMIT 1
    """)
    rows = df.collect()
    return rows[0][0] if rows else None

def _upsert_state(symbol: str, interval: str, last_ms: int):
    """STATE_TABLE에 마지막 open_time(ms) UPSERT"""
    now = dt.datetime.now(dt.timezone.utc).isoformat()
    spark.sql(f"""
      MERGE INTO {STATE_TABLE} t
      USING (SELECT '{symbol}' AS symbol, '{interval}' AS interval,
                    {last_ms} AS last_open_time_ms, TIMESTAMP('{now}') AS updated_at) s
      ON t.symbol = s.symbol AND t.interval = s.interval
      WHEN MATCHED THEN UPDATE SET last_open_time_ms = s.last_open_time_ms, updated_at = s.updated_at
      WHEN NOT MATCHED THEN INSERT (symbol, interval, last_open_time_ms, updated_at)
      VALUES (s.symbol, s.interval, s.last_open_time_ms, s.updated_at)
    """)

# ===================================
# (E) Bronze 테이블 데이터 채우기(중복 방지)
# ===================================
def _rows_to_bronze(symbol: str, interval: str, rows: List[list], endpoint: str, params: Dict):
    """
    Binance klines rows(list of lists) -> Bronze Delta UPSERT
    - 배치 내 dropDuplicates(unique_key)
    - MERGE ON (unique_key AND dt) 로 idempotent 보장 + 파티션 프루닝
    """
    if not rows:
        return 0, None

    param_hash = _params_hash(params)
    now = dt.datetime.now(dt.timezone.utc)
    recs = []
    max_open_ms = None

    # Binance kline item spec:
    # [0] open_time(ms), [1] open, [2] high, [3] low, [4] close, [5] volume, [6] close_time(ms), ...
    for item in rows:
        open_ms = int(item[0])
        event_time = _from_ms(open_ms)
        unique = f"{symbol}|{interval}|{open_ms}"
        recs.append({
          "source": "binance.spot.klines",
          "event_time": event_time.isoformat(),       # 문자열로 넣고 아래에서 TIMESTAMP 캐스팅
          "ingest_time": now.isoformat(),
          "unique_key": unique,
          "raw_json": json.dumps(item, separators=(",",":")),
          "api_endpoint": endpoint,
          "api_params_hash": param_hash,
          "dt": event_time.date().isoformat()         # 파티션 키
        })
        if (max_open_ms is None) or (open_ms > max_open_ms):
            max_open_ms = open_ms

    # 소스 DF 생성(+타입 캐스팅) & 배치 내 중복 제거
    schema = StructType([
        StructField("source",           StringType(), True),
        StructField("event_time",       StringType(), True),
        StructField("ingest_time",      StringType(), True),
        StructField("unique_key",       StringType(), True),
        StructField("raw_json",         StringType(), True),
        StructField("api_endpoint",     StringType(), True),
        StructField("api_params_hash",  StringType(), True),
        StructField("dt",               StringType(), True),
    ])
    df = spark.createDataFrame([Row(**r) for r in recs], schema) \
        .withColumn("event_time",  to_timestamp(col("event_time"))) \
        .withColumn("ingest_time", to_timestamp(col("ingest_time"))) \
        .withColumn("dt",          col("dt").cast("date")) \
        .dropDuplicates(["unique_key"])

    # Delta MERGE: (unique_key AND dt) 기준으로 idempotent UPSERT
    delta_table = DeltaTable.forName(spark, TABLE)

    # ingest_time 갱신 정책에 따른 SET 구성
    if UPSERT_UPDATE_INGEST_TIME:
        set_map = {
            "source":       "s.source",
            "event_time":   "s.event_time",
            "ingest_time":  "s.ingest_time",   # 매 업서트마다 최신으로 갱신
            "raw_json":     "s.raw_json",
            "api_endpoint": "s.api_endpoint",
            "api_params_hash":"s.api_params_hash",
            "dt":           "s.dt"
        }
    else:
        set_map = {
            "source":       "s.source",
            "event_time":   "s.event_time",
            "ingest_time":  "t.ingest_time",   # 최초값 보존
            "raw_json":     "s.raw_json",
            "api_endpoint": "s.api_endpoint",
            "api_params_hash":"s.api_params_hash",
            "dt":           "s.dt"
        }

    (delta_table.alias("t")
        .merge(df.alias("s"), "t.unique_key = s.unique_key AND t.dt = s.dt")  # 파티션 포함 조인
        .whenMatchedUpdate(set=set_map)
        .whenNotMatchedInsertAll()
        .execute())

    return df.count(), max_open_ms

# =========================
# (F) 모드별 동작
# =========================
def backfill_symbol(symbol: str, interval: str,
                    hours: int = BACKFILL_HOURS, limit: int = LIMIT_DEFAULT):
    """
    과거 구간을 윈도우로 끊어 안전하게 백필
    - STATE_TABLE의 last_open_time_ms가 있으면 그 다음부터 이어 받기
    - 6시간 창(batch_end) 단위로 요청해 rate limit/오류에 안전
    """
    now_utc = dt.datetime.now(dt.timezone.utc)
    start_utc = now_utc - dt.timedelta(hours=hours)

    last_ms = _get_last_state(symbol, interval)
    if last_ms:
        start_utc = _from_ms(last_ms) + dt.timedelta(milliseconds=1)

    start_ms = _to_ms(start_utc)
    end_ms   = _to_ms(now_utc)

    total_rows = 0
    cursor_ms = start_ms

    print(f"[BACKFILL] {symbol} {interval} {_from_ms(start_ms)} → {_from_ms(end_ms)}")
    while cursor_ms < end_ms:
        batch_end = min(end_ms, cursor_ms + 1000 * 60 * 60 * 6)  # 6시간 창
        params = {"symbol": symbol, "interval": interval,
                  "startTime": cursor_ms, "endTime": batch_end, "limit": limit}
        rows, headers = binance_klines(symbol, interval,
                                       start_ms=cursor_ms, end_ms=batch_end, limit=limit)
        count, max_open_ms = _rows_to_bronze(symbol, interval, rows, KLINES, params)

        used_weight = headers.get("X-MBX-USED-WEIGHT-1m") or headers.get("X-MBX-USED-WEIGHT")
        if used_weight:
            print(f"  used_weight(1m): {used_weight}")

        total_rows += count
        if max_open_ms is None:
            # 더 이상 데이터가 없을 때, 다음 창으로 넘어감
            cursor_ms = batch_end + 1
        else:
            # 마지막 open_time 이후부터 이어서 수집
            cursor_ms = max_open_ms + 1
            _upsert_state(symbol, interval, max_open_ms)

        time.sleep(0.2)  # rate 완화

    print(f"[BACKFILL DONE] {symbol} {interval}: {total_rows} rows")

def poll_once(symbol: str, interval: str, limit: int = 50):
    """
    근실시간 폴링(1회)
    - limit(20~100 권장)으로 최근 캔들 묶음 수집
    """
    params = {"symbol": symbol, "interval": interval, "limit": limit}
    rows, headers = binance_klines(symbol, interval, limit=limit)
    count, max_open_ms = _rows_to_bronze(symbol, interval, rows, KLINES, params)
    if max_open_ms is not None:
        _upsert_state(symbol, interval, max_open_ms)
    used_weight = headers.get("X-MBX-USED-WEIGHT-1m") or headers.get("X-MBX-USED-WEIGHT")
    print(f"[POLL] {symbol} {interval}: +{count} rows, last_open_ms={max_open_ms}, used_weight={used_weight}")

# =========================
# (G) MAIN
# =========================
if MODE == "backfill":
    for iv in INTERVALS:
        for sym in SYMBOLS:
            backfill_symbol(sym, iv, hours=BACKFILL_HOURS, limit=LIMIT_DEFAULT)
    dbutils.notebook.exit("backfill done")

elif MODE == "poll":
    for i in range(MAX_POLLS):
        for iv in INTERVALS:
            for sym in SYMBOLS:
                poll_once(sym, iv, limit=LIMIT_ONCE)
        time.sleep(POLL_SECONDS)
    dbutils.notebook.exit("poll done")

elif MODE == "forever":
    print(f"[LIVE] start polling every {POLL_SECONDS}s")
    while True:
        try:
            for iv in INTERVALS:
                for sym in SYMBOLS:
                    poll_once(sym, iv, limit=LIMIT_ONCE)
        except Exception as e:
            print(f"[WARN] {e}")
            time.sleep(5)
        time.sleep(POLL_SECONDS)

else:  # MODE == "once"
    for iv in INTERVALS:
        for sym in SYMBOLS:
            poll_once(sym, iv, limit=LIMIT_ONCE)
    dbutils.notebook.exit("once done")
