In [None]:

# Databricks notebook source
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col, expr

# COMMAND ----------
# Load silver data
df = spark.sql("SELECT * FROM fraud_miner.silver.fraud_geo_view")

# Feature engineering: geo match
df = df.withColumn("geo_matches_merchant", (col("geo_country") == col("Merchant_Country")).cast("int"))

# Define features
target = "Transaction_Fraud"
categorical_cols = ["Card_Provider", "Merchant_Category", "Merchant_Country", "geo_country"]
numeric_cols = ["Transaction_Amount", "geo_matches_merchant"]

# StringIndexers for categoricals
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid='keep') for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_vec") for c in categorical_cols]

# Vector Assembler and Scaler
assembler = VectorAssembler(
    inputCols=[f"{c}_vec" for c in categorical_cols] + numeric_cols,
    outputCol="features_raw"
)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")

# Logistic Regression
lr = LogisticRegression(labelCol=target, featuresCol="features")

# ML Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, lr])

# Cross-Validation Setup
param_grid = ParamGridBuilder().build()
evaluator = BinaryClassificationEvaluator(labelCol=target, metricName="areaUnderROC")

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=10
)

# Fit model
cv_model = cv.fit(df)

# COMMAND ----------
# Evaluate
predictions = cv_model.transform(df)
auc = evaluator.evaluate(predictions)
print(f"✅ Final AUC: {auc:.4f}")

# COMMAND ----------
# Save predictions (optional)
predictions.select("Transaction_Fraud", "prediction", "probability").write.mode("overwrite").saveAsTable("fraud_miner.gold.model_predictions")

# Save AUC to evaluation table
from pyspark.sql import Row
result_row = Row(run_date=str(expr("current_timestamp()")), auc=float(auc))
spark.createDataFrame([result_row]).write.mode("append").saveAsTable("fraud_miner.gold.model_evaluation")

# Save model (optional)
# cv_model.bestModel.write().overwrite().save("/mnt/models/fraud_logistic_spark")
