Get the storage link 

In [0]:
adls_path =  f"abfss://raw-data@storageforchurnproject.dfs.core.windows.net/"


In [0]:
container_name = "raw-data"
file_name = "WA_Fn-UseC_-Telco-Customer-Churn.csv"

### Data EDA

Display sample of the data set using the dispaly function 

In [0]:
df = spark.read.csv(f"{adls_path}/{file_name}", header=True, inferSchema=True)
display(df)

Check for missing data

In [0]:
df.describe().display()

In [0]:
from pyspark.sql.functions import col, when

# The 'TotalCharges' column has spaces for new customers. Let's treat them as 0.
# First, cast to a numeric type. Errors will become null.
df_clean = df.withColumn("TotalCharges", col("TotalCharges").cast("double"))

# Replace nulls (which were originally spaces) with 0
df_clean = df_clean.na.fill(value=0, subset=["TotalCharges"])

# Drop the customerID column as it's just an identifier
df_clean = df_clean.drop("customerID")

# Convert the label column 'Churn' into a numeric format (0 or 1)
df_clean = df_clean.withColumn("label", when(col("Churn") == "Yes", 1).otherwise(0))

print("Data successfully cleaned.")
display(df_clean.select("TotalCharges", "Churn", "label"))

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.ml import Pipeline

from pyspark.ml import Pipeline

# Identify categorical and numerical columns
categorical_cols = [field.name for field in df_clean.schema.fields if isinstance(field.dataType, StringType) and field.name != 'Churn']
numerical_cols = [field.name for field in df_clean.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType)) and field.name != 'label']

# --- Pipeline Stages ---

# Stage 1: StringIndexer to convert categorical strings to numeric indices
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="skip") for c in categorical_cols]

# Stage 2: OneHotEncoder to convert indexed categories into a binary vector
# Note: We are not using OneHotEncoder in the final pipeline for simplicity with Logistic Regression,
# but this is where you would typically add it. VectorAssembler can handle the indexed columns directly.

# Stage 3: VectorAssembler to combine all feature columns into a single vector
assembler_inputs = [f"{c}_index" for c in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Create the preprocessing pipeline
preprocessing_pipeline = Pipeline(stages=indexers + [assembler])

# Fit and transform the data
transformed_df = preprocessing_pipeline.fit(df_clean).transform(df_clean)

# Display the result
display(transformed_df.select("features", "label"))

In [0]:
# Split the data into training and testing sets
(train_data, test_data) = transformed_df.randomSplit([0.8, 0.2], seed=42)

print(f"Training set count: {train_data.count()}")
print(f"Test set count: {test_data.count()}")

In [0]:
# eo