In [0]:
# # 01 - Limpieza de Datos

# COMMAND ----------

from pyspark.sql.functions import col
from datetime import datetime
import json

# Leer la tabla original
df = spark.table("datalottery.lotterybets.lottery_bets_dirty")
print(f"Registros originales: {df.count()}")

# Eliminar duplicados por bet_id
df = df.dropDuplicates(subset=["bet_id"])

# Rellenar valores nulos
df = df.na.fill({
    "stake_amount": 0.0,
    "channel": "desconocido",
    "minutes_before_close": 0,
    "bets_last_7d": 0,
    "win_rate_last_30d": 0.0,
    "ip_risk": 0.0,
    "geo_risk": 0.0,
    "num_picks": 1,
    "suspicious": 0
})

# Validar rangos
df = df.filter(
    (col("stake_amount") >= 0) & (col("stake_amount") <= 10000) &
    (col("minutes_before_close") >= 0) &
    (col("win_rate_last_30d") >= 0) & (col("win_rate_last_30d") <= 1) &
    (col("ip_risk") >= 0) & (col("ip_risk") <= 1) &
    (col("geo_risk") >= 0) & (col("geo_risk") <= 1)
)

print(f"Registros después de limpieza: {df.count()}")

# Guardar como tabla Delta
df.write.format("delta").mode("overwrite").saveAsTable("datalottery.lotterybets.lottery_bets_dirty_cleaned")