In [0]:
%python
import json
import pandas as pd
import mlflow.pyfunc
import pickle
%pip install lightgbm
import lightgbm
from pyspark.sql import SparkSession

# 🚀 Load Fraud Model from S3
model_path = "/Workspace/Users/keerthanasingi@gmail.com/lightgbm_model.pkl"  # Update with correct path

with open(model_path, "rb") as f:
    loaded_model = pickle.load(f)

# ✅ Extract feature names safely
try:
    expected_features = loaded_model.feature_name()  # Use () instead of _ for Booster models
except AttributeError:
    expected_features = None  # Handle missing feature names

# 🛠️ Get Data from Lambda (DynamoDB)
dbutils.widgets.text("json_data", "")
json_data = dbutils.widgets.get("json_data")

if json_data:
    data = json.loads(json_data)
    
    # 🔹 Convert JSON to Pandas DataFrame
    df = pd.DataFrame(data)

    # ✅ Ensure correct features
    if expected_features:
        df = df[expected_features]  # Select only model's trained features
    else:
        print("Warning: Model does not contain feature names. Using all features.")

    # 🚀 Run Fraud Prediction
    df["fraud_status"] = loaded_model.predict(df)

    # 🔹 Remove duplicate columns
    #df = df.loc[:, ~df.columns.duplicated()]

    # 🔹 Convert to Spark DataFrame
    #spark = SparkSession.builder.appName("FraudDetection").getOrCreate()
    #spark_df = spark.createDataFrame(df)

    # 🔹 Store Results in Databricks Table
    #spark_df.write.mode("append").saveAsTable("cleaned_data")

    # 🔹 Display Output
    #display(spark_df)

    # Capture run details
    run_id = spark.sparkContext.getLocalProperty("spark.databricks.mlflow.runId")
    experiment_id = spark.sparkContext.getLocalProperty("spark.databricks.mlflow.experimentId")
    print(f"Run ID: {run_id}")
    print(f"Experiment ID: {experiment_id}")
else:
    print("No data provided in the json_data widget.")