In [0]:
spark


In [0]:
silver_df = spark.table(
    "ai_trust_catalog.churn_trust.silver_customer_churn"
)


In [0]:
clean_df = silver_df.drop("contract_length")

In [0]:
clean_df.printSchema()


# Trust Scoring Pipeline (Churn + Trust)

This notebook:
- Trains a churn model
- Uses churn probabilities to derive trust scores
- Persists trust decisions
- Performs analysis on HIGH_RISK vs LOW_RISK customers

In [0]:
import os
import mlflow
import mlflow.spark

# MLflow DFS temp location
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/main/default/mlflow_tmp"

In [0]:
input_df = spark.table(
    "ai_trust_catalog.churn_trust.silver_customer_churn"
)

In [0]:
from pyspark.sql.functions import col

clean_df = input_df \
    .withColumn("age", col("age").cast("double")) \
    .withColumn("tenure", col("tenure").cast("double")) \
    .withColumn("usage_frequency", col("usage_frequency").cast("double")) \
    .withColumn("support_calls", col("support_calls").cast("double")) \
    .withColumn("payment_delay", col("payment_delay").cast("double")) \
    .withColumn("total_spend", col("total_spend").cast("double")) \
    .withColumn("last_interaction", col("last_interaction").cast("double"))

In [0]:
from pyspark.ml.feature import StringIndexer

indexers = [
    StringIndexer(inputCol="gender", outputCol="gender_idx", handleInvalid="keep"),
    StringIndexer(inputCol="subscription_type", outputCol="subscription_type_idx", handleInvalid="keep")
]

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[
        "age",
        "tenure",
        "usage_frequency",
        "support_calls",
        "payment_delay",
        "total_spend",
        "last_interaction",
        "gender_idx",
        "subscription_type_idx"
    ],
    outputCol="features"
)

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="churn",
    probabilityCol="probability",
    predictionCol="prediction"
)

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [assembler, lr])
pipeline_model = pipeline.fit(clean_df)

In [0]:
mlflow.spark.log_model(
    pipeline_model,
    artifact_path="model_2_trust"
)

In [0]:
run_id = mlflow.last_active_run().info.run_id
run_id

In [0]:
pred_df = pipeline_model.transform(clean_df)

In [0]:
from pyspark.ml.functions import vector_to_array

scored_df = (
    pred_df
    .withColumn("prob_array", vector_to_array(col("probability")))
    .withColumn("prediction_confidence", col("prob_array")[1])
    .withColumn("prediction_uncertainty", 1 - col("prediction_confidence"))
    .drop("prob_array")
)

In [0]:
from pyspark.sql.functions import when

final_df = scored_df.select(
    "customerid",
    "prediction",
    "prediction_confidence",
    "prediction_uncertainty"
).withColumn(
    "final_decision",
    when(col("prediction_confidence") >= 0.7, "HIGH_RISK")
    .otherwise("LOW_RISK")
)

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS ai_trust_score_project")

In [0]:
final_df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ai_trust_score_project.trust_predictions_final")

In [0]:
%sql
SELECT final_decision, COUNT(*) AS customers
FROM ai_trust_score_project.trust_predictions_final
GROUP BY final_decision;

In [0]:
%sql
SELECT final_decision, AVG(prediction_confidence)
FROM ai_trust_score_project.trust_predictions_final
GROUP BY final_decision;

In [0]:
%sql
SELECT c.*, p.final_decision, p.prediction_confidence
FROM ai_trust_catalog.churn_trust.silver_customer_churn c
JOIN ai_trust_score_project.trust_predictions_final p
ON c.customerid = p.customerid;

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
client.list_artifacts(run_id)