In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# --- 1. SETUP: Fresh Spark Session ---
try:
    spark.stop()
    print("Stopped existing session.")
except:
    pass

print("Starting new Spark Session...")
spark = SparkSession.builder \
    .appName("PipelineVerification") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

# --- 2. VERIFY BRONZE (Did Ingestion Work?) ---
print("\n" + "="*40)
print("TEST 1: BRONZE LAYER (Ingestion)")
print("="*40)
try:
    df_bronze = spark.read.format("delta").load("s3a://bronze/telco_churn_delta")
    cols = df_bronze.columns

    print(cols)
    
    # Check for "PK" garbage
    if any("PK" in c for c in cols):
        print("❌ FAIL: 'PK' found in columns. Excel read failed.")
    elif "customerID" in cols:
        print("✅ PASS: Columns look correct (customerID found).")
        print(f"   Schema detected: TotalCharges is {df_bronze.schema['TotalCharges'].dataType}")
    else:
        print("⚠️ WARNING: Unexpected column names.")
        print(cols)
except Exception as e:
    print(f"❌ FAIL: Could not read Bronze table. {e}")

Starting new Spark Session...

TEST 1: BRONZE LAYER (Ingestion)
['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']
✅ PASS: Columns look correct (customerID found).
   Schema detected: TotalCharges is StringType()


In [2]:
df_bronze = spark.read.format("delta").load("s3a://bronze/telco_churn_delta")

In [3]:
df_bronze.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [4]:
from pyspark.sql.functions import col, count, when
df_bronze.select([count(when(col(c).isNull(), c)).alias(c) for c in df_bronze.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [5]:
# VERIFICATION SNIPPET
df_silver = spark.read.format("delta").load("s3a://silver/telco_churn_delta")

print("--- Silver Validation ---")
# 1. Check Total Charges Nulls (Should be 0)
nulls = df_silver.filter("total_charges IS NULL").count()
print(f"Nulls in total_charges: {nulls} (Expected: 0)")

# 2. Check Zeros (Should be >= 11)
zeros = df_silver.filter("total_charges == 0.0").count()
print(f"Zeros in total_charges: {zeros} (Expected: >= 11)")

# 3. Check Binary Types
print(f"Senior Citizen Type: {df_silver.schema['senior_citizen'].dataType}")
print(f"Churn Type: {df_silver.schema['churn'].dataType}")

--- Silver Validation ---
Nulls in total_charges: 0 (Expected: 0)
Zeros in total_charges: 11 (Expected: >= 11)
Senior Citizen Type: IntegerType()
Churn Type: IntegerType()


In [6]:
df_silver.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- senior_citizen: integer (nullable = true)
 |-- partner: integer (nullable = true)
 |-- dependents: integer (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- phone_service: integer (nullable = true)
 |-- multiple_lines: string (nullable = true)
 |-- internet_service: string (nullable = true)
 |-- online_security: string (nullable = true)
 |-- online_backup: string (nullable = true)
 |-- device_protection: string (nullable = true)
 |-- tech_support: string (nullable = true)
 |-- streaming_tv: string (nullable = true)
 |-- streaming_movies: string (nullable = true)
 |-- contract: string (nullable = true)
 |-- paperless_billing: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- monthly_charges: double (nullable = true)
 |-- total_charges: double (nullable = true)
 |-- churn: integer (nullable = true)



In [7]:
df_silver.select(col('streaming_movies')).show()

+----------------+
|streaming_movies|
+----------------+
|              No|
|             Yes|
|              No|
|             Yes|
|              No|
|             Yes|
|             Yes|
|              No|
|              No|
|             Yes|
|              No|
|             Yes|
|              No|
|              No|
|              No|
|             Yes|
|             Yes|
|              No|
|             Yes|
|              No|
+----------------+
only showing top 20 rows



In [8]:
import sys
# 2. Read Gold Parquet (Feast Format)
gold_path = "s3a://gold/telco_features_parquet"
try:
    print(f"Reading from {gold_path}...")
    df = spark.read.parquet(gold_path)
except Exception as e:
    print(f"CRITICAL: Could not read Gold data. {e}")
    sys.exit(1)

# --- CHECK 1: BASIC STATS ---
print(f"\n1. Total Rows: {df.count()}")
print(f"2. Columns: {len(df.columns)}")
print("3. Schema (Look for 'event_timestamp' and 'vector' types):")
df.printSchema()

# --- CHECK 2: ENGINEERED FEATURES (Project Requirements) ---
print("\n--- 4. Inspecting New Derived Features ---")
# We check the specific columns we engineered: tenure_group, billing_ratio, is_high_value
cols_to_check = ["customer_id", "tenure", "tenure_group", "billing_ratio", "is_high_value"]

# Check if they exist first
existing_cols = [c for c in cols_to_check if c in df.columns]
df.select(existing_cols).show(10, truncate=False)

if "billing_ratio" in df.columns:
    print("\n   -> Stats for 'Billing Ratio' (Should vary around 1.0):")
    df.select("billing_ratio").describe().show()

# --- CHECK 3: ENCODING VERIFICATION ---
print("\n--- 5. Verifying One-Hot Encoding ---")
# Check if 'InternetService' got converted to a Vector
# You should see something like (3,[1],[1.0]) which is Spark's sparse vector format
vec_cols = [c for c in df.columns if c.endswith("_vec")]
if vec_cols:
    print(f"Found {len(vec_cols)} encoded vector columns.")
    df.select(vec_cols[:3]).show(5, truncate=False) # Show first 3 vector columns
else:
    print("⚠️ WARNING: Encoded vectors not found!")

# --- CHECK 4: FEAST READINESS ---
print("\n--- 6. Checking Feature Store Timestamps ---")
if "event_timestamp" in df.columns:
    df.select("customer_id", "event_timestamp", "created_timestamp").show(5, truncate=False)
else:
    print("❌ CRITICAL: 'event_timestamp' missing. Feast will fail.")

spark.stop()

Reading from s3a://gold/telco_features_parquet...

1. Total Rows: 7043
2. Columns: 27
3. Schema (Look for 'event_timestamp' and 'vector' types):
root
 |-- customer_id: string (nullable = true)
 |-- senior_citizen: integer (nullable = true)
 |-- partner: integer (nullable = true)
 |-- dependents: integer (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- phone_service: integer (nullable = true)
 |-- paperless_billing: integer (nullable = true)
 |-- monthly_charges: double (nullable = true)
 |-- total_charges: double (nullable = true)
 |-- churn: integer (nullable = true)
 |-- tenure_group: double (nullable = true)
 |-- avg_historical_charge: double (nullable = true)
 |-- billing_ratio: double (nullable = true)
 |-- is_high_value: integer (nullable = true)
 |-- gender_vec: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- multiple_lines_vec: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- internet_service_vec: array (null