In [None]:

import os
import json
from datetime import datetime, timezone
import pandas as pd
import yfinance as yf
from confluent_kafka import Producer, KafkaException
import time

# --- PARAMETERS CELL ---
# Tag this cell with "parameters" in Jupyter/Colab metadata
KAFKA_BROKER = ""  # Papermill will override this

# --- CONFIG ---
crypto_tickers = ["BTC-USD"]  # Single ticker to minimize load
equity_tickers = ["AAPL"]
treasury_tickers = ["^TNX"]

crypto_topic = "crypto_ticks"
equity_topic = "equities_ticks"
bonds_topic = "bonds_data"

# --- IMPORTS ---
try:
    import yfinance as yf
    import json
    from confluent_kafka import Producer, KafkaException
    from datetime import datetime, timezone
    import pandas as pd
except ImportError as e:
    print(f"❌ Missing dependency: {e}")
    raise

# --- BROKER SETUP ---
KAFKA_BROKER = KAFKA_BROKER or os.getenv("KAFKA_BROKER", "0.tcp.ap.ngrok.io:10432")
print(f"[DEBUG] KAFKA_BROKER value: '{KAFKA_BROKER}'")
if not KAFKA_BROKER:
    print("⚠️ KAFKA_BROKER not provided, exiting")
    raise ValueError("KAFKA_BROKER not provided")

print(f"📡 Connecting to Kafka broker: {KAFKA_BROKER}")
try:
    producer = Producer({'bootstrap.servers': KAFKA_BROKER})
except KafkaException as e:
    print(f"❌ Failed to connect to Kafka: {e}")
    raise

# --- PRODUCE FUNCTION ---
def produce_latest(symbols, topic, interval="5m", max_retries=3, base_delay=300):
    print(f"=== Fetching {topic} ===")
    all_data = []
    for sym in symbols:
        for attempt in range(max_retries):
            try:
                df = yf.download(sym, period="1d", interval=interval, threads=False)
                print(f"[DEBUG] DataFrame for {sym} in {topic}, attempt {attempt + 1}/{max_retries}: empty={df.empty}, columns={df.columns}")
                if df.empty:
                    print(f"⚠️ Empty DataFrame for {sym} in {topic}, attempt {attempt + 1}/{max_retries}")
                    if attempt < max_retries - 1:
                        delay = base_delay * (2 ** attempt)
                        print(f"[DEBUG] Waiting {delay} seconds before retry")
                        time.sleep(delay)
                    continue
                # Verify columns
                expected_columns = ["Open", "High", "Low", "Close", "Volume"]
                if not all(col in df.columns for col in expected_columns):
                    print(f"⚠️ Invalid columns for {sym} in {topic}: {df.columns}")
                    if attempt < max_retries - 1:
                        delay = base_delay * (2 ** attempt)
                        print(f"[DEBUG] Waiting {delay} seconds before retry")
                        time.sleep(delay)
                    continue
                all_data.append(df)
                print(f"[DEBUG] Fetched data for {sym} in {topic}: {df.columns}")
                break
            except Exception as e:
                is_rate_limit = "rate limit" in str(e).lower()
                print(f"❌ Failed to download data for {sym} in {topic}: {e}, attempt {attempt + 1}/{max_retries}, RateLimit={is_rate_limit}")
                if attempt < max_retries - 1:
                    delay = base_delay * (2 ** attempt)
                    print(f"[DEBUG] Waiting {delay} seconds before retry")
                    time.sleep(delay)
                continue
        else:
            print(f"❌ Failed to fetch data for {sym} in {topic} after {max_retries} attempts")
            continue

    if not all_data:
        print(f"❌ No data fetched for {topic}")
        return

    # Process data for each symbol
    for sym in symbols:
        df = all_data[0].copy()  # Single ticker case
        try:
            # Handle MultiIndex if present
            if isinstance(df.columns, pd.MultiIndex):
                df.columns = df.columns.get_level_values(0)
            sym_df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
            sym_df = sym_df.reset_index().rename(columns={"index": "Datetime"})
            print(f"[DEBUG] Columns after reset_index for {sym}: {sym_df.columns}")
        except (KeyError, AttributeError) as e:
            print(f"[DEBUG] Error indexing for {sym}: {e}")
            sym_df = df.reset_index().rename(columns={"index": "Datetime"})
            print(f"[DEBUG] Fallback columns for {sym}: {sym_df.columns}")

        # Ensure 'Datetime' column exists
        if "Date" in sym_df.columns and "Datetime" not in sym_df.columns:
            sym_df = sym_df.rename(columns={"Date": "Datetime"})
            print(f"[DEBUG] Renamed 'Date' to 'Datetime' for {sym}")

        # Check for required columns
        required_columns = ["Datetime", "Open", "High", "Low", "Close", "Volume"]
        if not all(col in sym_df.columns for col in required_columns):
            print(f"⚠️ Missing columns in DataFrame for {sym}: {sym_df.columns}")
            continue

        sym_df = sym_df.dropna(subset=required_columns)
        if sym_df.empty:
            print(f"⚠️ No valid data for {sym}")
            continue

        latest_ts = sym_df["Datetime"].max()
        latest_rows = sym_df[sym_df["Datetime"] == latest_ts]

        for _, row in latest_rows.iterrows():
            msg = {
                "symbol": sym,
                "timestamp": row["Datetime"].isoformat(),
                "open": float(row["Open"]),
                "high": float(row["High"]),
                "low": float(row["Low"]),
                "close": float(row["Close"]),
                "volume": float(row["Volume"]),
                "ingested_at": datetime.now(timezone.utc).isoformat()
            }
            print(f"[SCRAPE DEBUG] {topic} {msg}")
            try:
                producer.produce(topic, json.dumps(msg).encode("utf-8"))
            except (BufferError, KafkaException) as e:
                print(f"❌ Failed to produce message for {sym}: {e}")

    producer.flush()

# --- RUN SCRAPES ---
produce_latest(crypto_tickers, crypto_topic, interval="5m")
produce_latest(equity_tickers, equity_topic, interval="5m")
produce_latest(treasury_tickers, bonds_topic, interval="1d")

print("✅ All data produced to Kafka topics.")
