In [0]:
# Databricks notebook source
dbutils.widgets.text("catalog", "ptd_dev")
dbutils.widgets.text("schema_silver", "silver")
dbutils.widgets.text("schema_gold", "gold")
dbutils.widgets.text("horizon_days", "5")
dbutils.widgets.text("tcost_bps", "5")          # costo transaccional ida+vuelta en bps (0.05% = 5 bps)
dbutils.widgets.text("thr_up_bps", "0")         # umbral para label up (en bps sobre forward return neto)
dbutils.widgets.text("thr_down_bps", "0")       # umbral para label down (en bps)

catalog       = dbutils.widgets.get("catalog")
schema_silver = dbutils.widgets.get("schema_silver")
schema_gold   = dbutils.widgets.get("schema_gold")
H             = int(dbutils.widgets.get("horizon_days"))
tcost_bps     = float(dbutils.widgets.get("tcost_bps"))
thr_up_bps    = float(dbutils.widgets.get("thr_up_bps"))
thr_dn_bps    = float(dbutils.widgets.get("thr_down_bps")) if "thr_down_bps" in [w.name for w in dbutils.widgets.getArgument()._widgets] else thr_up_bps

spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"USE SCHEMA {schema_gold}")

from pyspark.sql import functions as F, Window as W

prices_daily     = spark.table(f"{catalog}.{schema_silver}.prices_daily")
market_features  = spark.table(f"{catalog}.{schema_silver}.market_features")
news_daily_agg   = spark.table(f"{catalog}.{schema_silver}.news_daily_agg")

# 1) Features: unir precios + mercado + noticias
pd = prices_daily.alias("p")

# Lags de returns y medias móviles ya están; agregamos lags extra
w = W.partitionBy("ticker").orderBy("date")
feat = (pd
    .withColumn("ret_1d_lag1", F.lag("ret_1d",1).over(w))
    .withColumn("ret_1d_lag2", F.lag("ret_1d",2).over(w))
    .withColumn("ret_5d_lag1", F.lag("ret_5d",1).over(w))
)

# Join con market_features (SPY y ^VIX columnas pivot)
f = (feat.join(market_features, on="date", how="left")
         .join(news_daily_agg, on=["ticker","date"], how="left")
         .fillna({"news_count":0})
         .withColumn("tone_avg", F.coalesce(F.col("tone_avg"), F.lit(0.0)))
         .withColumn("ingestion_ts", F.current_timestamp())
)

f.write.mode("overwrite").option("overwriteSchema","true").saveAsTable(f"{catalog}.{schema_gold}.features_daily")

# 2) Labels: forward return H-días neto de costos
w2 = W.partitionBy("ticker").orderBy("date")
lb = (prices_daily.select("ticker","date","adj_close","ret_1d")
      .withColumn(f"adj_close_fwd_{H}", F.lead("adj_close", H).over(w2))
      .withColumn("fwd_ret", F.col(f"adj_close_fwd_{H}")/F.col("adj_close") - 1)
      .withColumn("fwd_ret_net", F.col("fwd_ret") - F.lit(tcost_bps/10000.0))
      .withColumn("label_up",  (F.col("fwd_ret_net") >  (thr_up_bps/10000.0)).cast("int"))
      .withColumn("label_down",(F.col("fwd_ret_net") < -(thr_dn_bps/10000.0)).cast("int"))
      .drop(f"adj_close_fwd_{H}")
      .withColumn("ingestion_ts", F.current_timestamp())
)

lb.write.mode("overwrite").option("overwriteSchema","true").saveAsTable(f"{catalog}.{schema_gold}.labels_daily")

# 3) Dataset ML = features + labels (intersección)
ds = (spark.table(f"{catalog}.{schema_gold}.features_daily").alias("x")
      .join(lb.alias("y"), on=["ticker","date"], how="inner"))

ds.write.mode("overwrite").option("overwriteSchema","true").saveAsTable(f"{catalog}.{schema_gold}.dataset_ml")