In [0]:
# ------------------------------------
# Cell 1 ‚Äî Configuration and Imports
# ------------------------------------
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, when, count, isnan
from pyspark.sql import types as T

# Load the balanced dataset (after SMOTE)
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
train_df = spark.table("kusha_solutions.telecom_churn_ml.train_balanced")

print("‚úÖ Balanced data loaded successfully for Feature Engineering")
print("Row count:", train_df.count())


In [0]:
# ------------------------------------
# Cell 2 ‚Äî Create Derived Features
# ------------------------------------

# 1Ô∏è‚É£ Average Monthly Spend (TotalCharges / tenure)
train_df = train_df.withColumn(
    "AvgMonthlySpend",
    F.when(col("tenure") > 0, col("TotalCharges") / col("tenure")).otherwise(0)
)

# 2Ô∏è‚É£ Payment Behavior Ratio
train_df = train_df.withColumn(
    "Monthly_to_Total_Ratio",
    F.when(col("TotalCharges") > 0, col("MonthlyCharges") / col("TotalCharges")).otherwise(0)
)

# 4Ô∏è‚É£ Has Internet connection
train_df = train_df.withColumn("HasInternet", when(col("InternetService_index") != 0, 1).otherwise(0))

# 5Ô∏è‚É£ Number of active services (counts of 'Yes' in service features)
service_features = ["PhoneService", "MultipleLines", "OnlineSecurity", "OnlineBackup",
                    "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies"]

train_df = train_df.withColumn(
    "ActiveServiceCount",
    reduce(lambda a, b: a + b, [when(col(c) == 1, 1).otherwise(0) for c in service_features])
)

print("‚úÖ Created new domain-based features successfully.")
display(train_df.select("AvgMonthlySpend", "Monthly_to_Total_Ratio", "HasInternet", "ActiveServiceCount").limit(5))


In [0]:
# ------------------------------------
# Cell 3 ‚Äî Feature Correlation Analysis
# ------------------------------------
import pandas as pd

train_pd = train_df.select(
    "tenure", "MonthlyCharges", "TotalCharges", "AvgMonthlySpend",
    "Monthly_to_Total_Ratio", "ActiveServiceCount", "HasInternet", "Churn_index"
).toPandas()

# Compute correlation matrix
corr_matrix = train_pd.corr()

import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm')
plt.title("Feature Correlation Heatmap")
plt.show()


In [0]:
# ------------------------------------
# Cell 4 ‚Äî Drop Redundant or Unnecessary Columns
# ------------------------------------
from pyspark.sql import functions as F

# Columns that don't add predictive value or are redundant
cols_to_drop = [
    "customerID",                # Unique identifier
    "TotalCharges",              # Replaced by TotalCharges_log
    "MonthlyCharges",            # Replaced by MonthlyCharges_log
    "num_features_scaled",       # Temporary technical column
    "num_features_unscaled"      # Temporary technical column
]

# Drop only if columns exist in the DataFrame
train_df = train_df.drop(*[c for c in cols_to_drop if c in train_df.columns])

print("‚úÖ Dropped redundant/unnecessary columns successfully.")
print("Remaining columns count:", len(train_df.columns))
display(train_df)


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

feature_cols = [c for c, t in train_df.dtypes if t in ("int", "double", "float","long") and c != "Churn_index"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_vec = assembler.transform(train_df)

rf = RandomForestClassifier(featuresCol="features", labelCol="Churn_index", numTrees=30, maxDepth=5)
model = rf.fit(train_vec)

importances = model.featureImportances
feature_importance = list(zip(feature_cols, importances))
importance_df = pd.DataFrame(feature_importance, columns=["Feature", "Importance"])
importance_df["Rank"] = importance_df["Importance"].rank(method="first", ascending=False)
importance_df = importance_df.sort_values("Rank").reset_index(drop=True)

display(importance_df)

In [0]:
# ------------------------------------
# Cell 6 ‚Äî Drop Low-Importance Features Automatically
# ------------------------------------
from pyspark.sql import functions as F

# Convert feature importance (from Random Forest) into a Pandas DataFrame if not already
# importance_df = pd.DataFrame(feature_importance, columns=["Feature", "Importance"])

# ‚úÖ Step 1: Define threshold (features with importance < 0.01 will be dropped)
importance_threshold = 0.01

# ‚úÖ Step 2: Identify features to drop
low_importance_features = importance_df[importance_df["Importance"] < importance_threshold]["Feature"].tolist()

print("üîª Features to Drop (Low Importance):")
for f in low_importance_features:
    print(f"- {f}")

# ‚úÖ Step 3: Drop those columns from train_df
train_df = train_df.drop(*[c for c in low_importance_features if c in train_df.columns])

print(f"\n‚úÖ Dropped {len(low_importance_features)} low-importance features successfully.")
print("Remaining columns:", len(train_df.columns))
display(train_df.limit(5))


In [0]:
# ------------------------------------
# Cell 7 ‚Äî Feature Reordering Based on Importance
# ------------------------------------
from pyspark.sql import functions as F

# ‚úÖ Step 1: Extract the remaining high-importance features (already filtered in previous cell)
ordered_features = (
    importance_df[importance_df["Importance"] >= 0.01]
    .sort_values("Importance", ascending=False)["Feature"]
    .tolist()
)

# ‚úÖ Step 2: Append target column 'Churn_index' at the end
ordered_features.append("Churn_index")

# ‚úÖ Step 3: Reorder columns in train_df (keep only the ordered features)
train_df = train_df.select([c for c in ordered_features if c in train_df.columns])

# ‚úÖ Step 4: Verify new column order
print("‚úÖ Features reordered successfully based on importance ranking.")
print("New column order (most important first):")
print(train_df.columns)

# ‚úÖ Step 5: Display a few records to confirm
display(train_df.limit(5))


In [0]:
# ------------------------------------
# Cell ‚Äî VectorAssembler
# ------------------------------------

from pyspark.ml.feature import VectorAssembler

# ‚úÖ Step 1: Define feature columns (exclude target)
feature_cols = [c for c in train_df.columns if c != "Churn_index"]

# ‚úÖ Step 2: Assemble all features into a single vector column
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# ‚úÖ Step 3: Transform the dataset to create 'features' column
train_vec = assembler.transform(train_df)

print("‚úÖ VectorAssembler completed successfully.")
print(f"Total features combined: {len(feature_cols)}")

# ‚úÖ Optional: Preview
display(train_vec.select("features", "Churn_index").limit(5))


In [0]:
# ------------------------------------
# Cell ‚Äî StandardScaler (Feature Scaling)
# ------------------------------------

from pyspark.ml.feature import StandardScaler

# ‚úÖ Step 1: Initialize StandardScaler
scaler = StandardScaler(
    inputCol="features",          # the vector created from VectorAssembler
    outputCol="features_scaled",  # new column for scaled features
    withMean=True,                # center the data (mean = 0)
    withStd=True                  # scale to unit variance (std = 1)
)

# ‚úÖ Step 2: Fit the scaler on training data
scaler_model = scaler.fit(train_vec)

# ‚úÖ Step 3: Transform data to create scaled feature vector
train_scaled = scaler_model.transform(train_vec)

print("‚úÖ StandardScaler applied successfully.")
display(train_scaled.select("features_scaled", "Churn_index").limit(5))


In [0]:
# ------------------------------------
# Cell ‚Äî Store Transformed & Scaled Dataset
# ------------------------------------

# ‚úÖ Define target Delta table name
transformed_table = "kusha_solutions.telecom_churn_ml.train_final_featured_transformed"

# ‚úÖ Select only required columns for model training
# Keeping: scaled features + target column
final_train_df = train_scaled.select("features_scaled", "Churn_index")

# ‚úÖ Store the final transformed dataset into Delta table
final_train_df.write.format("delta").mode("overwrite").saveAsTable(transformed_table)

print(f"‚úÖ Final transformed and scaled training data stored successfully as: {transformed_table}")
display(spark.table(transformed_table).limit(5))


In [0]:
# ------------------------------------
# Cell 1 ‚Äî Load Test Dataset
# ------------------------------------
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, sum

test_df = spark.table("kusha_solutions.telecom_churn_ml.test_transformed")
print("‚úÖ Test data loaded successfully")
print("Row count:", test_df.count())

# ------------------------------------
# Cell 2 ‚Äî Create Derived Features (Same as Train)
# ------------------------------------
test_df = test_df.withColumn(
    "AvgMonthlySpend",
    F.when(col("tenure") > 0, col("TotalCharges") / col("tenure")).otherwise(0)
)

test_df = test_df.withColumn(
    "Monthly_to_Total_Ratio",
    F.when(col("TotalCharges") > 0, col("MonthlyCharges") / col("TotalCharges")).otherwise(0)
)

test_df = test_df.withColumn(
    "HasInternet",
    when(col("InternetService_index") != 0, 1).otherwise(0)
)

# ------------------------------------
# Fix: Ensure categorical service features are numeric before ActiveServiceCount
# ------------------------------------
service_features = [
    "PhoneService", "MultipleLines", "OnlineSecurity", "OnlineBackup",
    "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies"
]

for c in service_features:
    if c in test_df.columns:
        test_df = test_df.withColumn(
            c,
            when(col(c) == 'Yes', 1).otherwise(0)
        )

# Now safely create ActiveServiceCount
from functools import reduce
from pyspark.sql.functions import col, when

test_df = test_df.withColumn(
    "ActiveServiceCount",
    reduce(lambda a, b: a + b, [when(col(c) == 1, 1).otherwise(0) for c in service_features])
)


print("‚úÖ Derived features added to test data")
display(test_df.select("AvgMonthlySpend", "Monthly_to_Total_Ratio", "HasInternet", "ActiveServiceCount").limit(5))

# ------------------------------------
# Cell 3 ‚Äî Encode Target Column (Churn ‚Üí Churn_index)
# ------------------------------------
# Encode Churn as numeric labels for ML (0,1,2)
test_df = test_df.withColumn(
    "Churn_index",
    when(col("Churn") == "No", 0)
    .when(col("Churn") == "Yes", 2)
    .otherwise(1)
)

print("‚úÖ Churn column encoded as Churn_index")
display(test_df.select("Churn", "Churn_index").distinct())

# ------------------------------------
# Cell 4 ‚Äî Drop Redundant/Low-Importance Features
# ------------------------------------
cols_to_drop_train = [
    "customerID", "TotalCharges", "MonthlyCharges",
    "num_features_scaled", "num_features_unscaled", "Churn"  # drop old churn text column
]

all_cols_to_drop = [c for c in cols_to_drop_train + low_importance_features if c in test_df.columns]
test_df = test_df.drop(*all_cols_to_drop)

print(f"‚úÖ Dropped redundant and low-importance features: {len(all_cols_to_drop)} columns")
display(test_df.limit(5))

# ------------------------------------
# Cell 5 ‚Äî Reorder Features (Same Order as Train)
# ------------------------------------
ordered_features_no_target = [c for c in ordered_features if c != "Churn_index" and c in test_df.columns]
test_df = test_df.select(ordered_features_no_target + ["Churn_index"])

print("‚úÖ Test features reordered to match training order")
display(test_df.limit(5))

# ------------------------------------
# Cell 6 ‚Äî Apply VectorAssembler and StandardScaler
# ------------------------------------
test_vec = assembler.transform(test_df)
print("‚úÖ VectorAssembler applied to test data")

test_scaled = scaler_model.transform(test_vec)
print("‚úÖ StandardScaler applied (no data leakage)")
display(test_scaled.select("features_scaled", "Churn_index").limit(5))

# ------------------------------------
# Cell 7 ‚Äî Save Transformed Test Dataset
# ------------------------------------
final_test_table = "kusha_solutions.telecom_churn_ml.test_final_featured_transformed"

final_test_df = test_scaled.select("features_scaled", "Churn_index")

final_test_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(final_test_table)

print(f"‚úÖ Test data transformed and saved successfully as: {final_test_table}")
display(spark.table(final_test_table).limit(5))
