##  02_data_preprocessing

The goal of this notebook is to clean and preprocess the data. 
We'll cover tasks such as handling missing values, encoding categorical variables, and feature engineering


#### * Imports and Initialization *


In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Customer Churn Data Preprocessing") \
    .getOrCreate()

print("Spark session initialized.")


### Load Data
we'll load the data from our local machine.

In [None]:
# Load the dataset from local storage
data_path = "C:/Users/ADMIN/Desktop/Projects/Batch-Processing-Project-Customer-Churn-Prediction-Pipeline/datasets/Bank Customer Churn Prediction.csv"
df = pd.read_csv(data_path)

# Check the first few rows of the dataset
df.head()


### Handle Missing Values

Handling missing data is an essential preprocessing step. We can either drop rows with missing values or fill them with appropriate values (like the mean, median, or mode).

In [None]:
# Check for missing values
df.isnull().sum()

# Option 1: Drop rows with missing values
df_cleaned = df.dropna()

# Option 2: Fill missing values with median for numerical columns
# For categorical columns, you could fill with the mode (most frequent value)
df_cleaned['TotalCharges'] = df_cleaned['TotalCharges'].fillna(df_cleaned['TotalCharges'].median())
df_cleaned['MonthlyCharges'] = df_cleaned['MonthlyCharges'].fillna(df_cleaned['MonthlyCharges'].median())

# Alternatively, for categorical columns like 'PaymentMethod', you can use mode:
df_cleaned['PaymentMethod'] = df_cleaned['PaymentMethod'].fillna(df_cleaned['PaymentMethod'].mode()[0])

# Verify the result
df_cleaned.isnull().sum()


### Data Transformation
Convert categorical columns to numerical values: Use label encoding or one-hot encoding for categorical features like gender, InternetService, etc

In [None]:
# Convert categorical columns to numeric using StringIndexer and OneHotEncoder
categorical_columns = ['gender', 'InternetService', 'Contract', 'PaymentMethod', 'Partner', 'Dependents']

# StringIndexer for label encoding
indexers = [StringIndexer(inputCol=col, outputCol=col + '_Index') for col in categorical_columns]

# Apply OneHotEncoder
encoders = [OneHotEncoder(inputCol=col + '_Index', outputCol=col + '_Encoded') for col in categorical_columns]

# Combine indexers and encoders in a pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Apply the pipeline to the dataframe
df_spark = spark.createDataFrame(df_cleaned)
model = pipeline.fit(df_spark)
df_transformed = model.transform(df_spark)

# Show the transformed data with encoded columns
df_transformed.select(categorical_columns + [col + '_Encoded' for col in categorical_columns]).show()


### Feature Engineering
Create new features: For instance, we can create new features such as tenure_months, charges_diff (difference between MonthlyCharges and TotalCharges).

In [None]:
# Feature engineering: Example of creating new features

df_transformed = df_transformed.withColumn('charges_diff', col('MonthlyCharges') - col('TotalCharges'))

# Show the new feature

df_transformed.select('charges_diff').show()


 ### Prepare Final Dataset for Machine Learning
Select features and label: Choose the relevant columns for model training. The target variable is Churn, which needs to be encoded (1 for churn, 0 for non-churn).

In [None]:
# Assuming 'Churn' is the target variable
df_transformed = df_transformed.withColumn('Churn', col('Churn').cast('double'))

# Select the final features and label columns
feature_columns = ['tenure', 'MonthlyCharges', 'TotalCharges', 'charges_diff'] + \
                  [col + '_Encoded' for col in categorical_columns]
final_df = df_transformed.select(*feature_columns, 'Churn')

# Show the final prepared dataframe
final_df.show(5)


###  Save Preprocessed Data to Delta Lake
To store the preprocessed data in Delta Lake for future analysis, we'll use Delta format.

In [None]:
# Define the path for storing the Delta table in your bucketname that you created 
delta_path = "s3a://hmati/preprocessed_data"

# Save the transformed data as Delta table
final_df.write.format("delta").mode("overwrite").save(delta_path)

print("Preprocessed data saved to Delta Lake.")


In [None]:
7. Summary and Next Steps
 summary of the preprocessing steps.