In [0]:
# Databricks Notebook: Model Inference

import mlflow.pyfunc
import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("LoanInference").getOrCreate()

# Load the latest version of the registered model
model_name = "loan_default_model"
model = mlflow.pyfunc.load_model(f"models:/{model_name}/latest")

# Load new data for inference
df = spark.read.parquet("/databricks-datasets/samples/lending_club/parquet/").toPandas()
df = df[["loan_amnt", "funded_amnt", "term", "int_rate", "installment", "annual_inc", "dti", "delinq_2yrs"]]
df["term"] = df["term"].str.replace(" months", "").astype(int)

# Select 5 random samples for inference
X_new = df.sample(5, random_state=42)

# Run Inference
predictions = model.predict(X_new)

# Save results
results = X_new.copy()
results["prediction"] = predictions
display(results)  # Show results in Databricks
results.to_csv("/dbfs/ml/inference_results.csv", index=False)

print("✅ Inference completed! Results saved.")


In [0]:
# Databricks Notebook: Model Inference

import mlflow
import mlflow.sklearn
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from sklearn.preprocessing import StandardScaler

# Step 1: Initialize SparkSession
spark = SparkSession.builder.appName("LoanInference").getOrCreate()

# Step 2: Load the Trained Model from MLflow Model Registry
model_name = "loan_model5"
model_version = 2

# Workaround to set the registry URI manually
#mlflow.tracking._model_registry.utils._get_registry_uri_from_spark_session = lambda: "databricks-uc"
mlflow.set_registry_uri("databricks-uc")

# Download the model artifacts
model_uri = f"models:/{model_name}/{model_version}"
destination_path = "/tmp/model"
mlflow.artifacts.download_artifacts(artifact_uri=model_uri, dst_path=destination_path)

# Load the model from the Databricks Model Registry
#model = mlflow.sklearn.load_model(f"models:/{model_name}/{model_version}")

# Load the model from the downloaded artifacts
model = mlflow.sklearn.load_model(destination_path)

# Step 3: Load New Data
# Simulating new loan applications from Databricks sample dataset
df_new = spark.read.parquet("/databricks-datasets/samples/lending_club/parquet/")

# Step 4: Save New Data to a Table
df_new.write.format("delta").mode("overwrite").saveAsTable("new_loan_data")

# Step 5: Load the Data Back from the Table
df_table = spark.read.table("new_loan_data")

# Step 6: Convert the DataFrame to Pandas for Processing
df_pandas = df_table.toPandas()

# Step 7: Feature Engineering (same steps as training)
df_pandas = df_pandas.dropna(subset=['loan_amnt', 'funded_amnt', 'int_rate', 'installment', 'annual_inc', 'dti', 'delinq_2yrs'])

# Select relevant columns
df_pandas = df_pandas[["loan_amnt", "funded_amnt", "term", "int_rate", "installment", "annual_inc", "dti", "delinq_2yrs"]]

# Convert categorical column
df_pandas["term"] = df_pandas["term"].str.replace(" months", "").astype(int)

# Convert string percentages to floats
df_pandas['int_rate'] = df_pandas['int_rate'].str.replace('%', '').astype(float)

# Standardize the input features (using the same scaler as training)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_pandas)

# Step 8: Perform Inference
predictions = model.predict(X_scaled)

# Step 9: Convert Predictions to DataFrame
df_pandas["loan_default_prediction"] = predictions

# Convert Pandas DataFrame back to Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Step 10: Save Inference Results to a Table
df_spark.write.format("delta").mode("overwrite").saveAsTable("loan_inference_results")

print("✅ Inference completed and results saved to table 'loan_inference_results'.")
