In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

def load_data_from_postgres():
    spark = SparkSession.builder \
        .appName("ChurnGuard-FeatureEngineering") \
        .config("spark.jars", "/Users/abhishekkalugade/Programming/My_Projects/ChurnGuard Project_Customer_Churn_Prediction/postgresql-42.7.8.jar") \
        .getOrCreate()
    
    jdbc_url = "jdbc:postgresql://localhost:5432/churn_db"
    properties = {
        "user": "churnuser",
        "password": "churnpass",
        "driver": "org.postgresql.Driver"
    }
    
    df = spark.read.jdbc(url=jdbc_url, table="customers", properties=properties)
    return spark, df

def engineer_features(df):
    df = df.withColumn("tenure_group", 
        when(col("tenure") <= 12, "0-1 year")
        .when(col("tenure") <= 24, "1-2 years")
        .when(col("tenure") <= 48, "2-4 years")
        .when(col("tenure") <= 60, "4-5 years")
        .otherwise("5+ years")
    )
    
    service_cols = ['phoneservice', 'multiplelines', 'internetservice', 
                    'onlinesecurity', 'onlinebackup', 'deviceprotection', 
                    'techsupport', 'streamingtv', 'streamingmovies']
    
    df = df.withColumn("total_services",
        sum([when(col(c) == "Yes", 1).otherwise(0) for c in service_cols])
    )
    
    df = df.withColumn("charges_per_service",
        col("monthlycharges") / (col("total_services") + 1)
    )
    
    df = df.withColumn("has_family",
        when((col("partner") == "Yes") | (col("dependents") == "Yes"), 1).otherwise(0)
    )
    
    df = df.withColumn("totalcharges_clean",
        when(col("totalcharges") == " ", 0.0)
        .otherwise(col("totalcharges").cast("double"))
    )
    
    df = df.withColumn("avg_monthly_spend",
        col("totalcharges_clean") / (col("tenure") + 1)
    )
    
    return df

def encode_categorical_features(df):
    categorical_cols = ['gender', 'partner', 'dependents', 'phoneservice',
                       'multiplelines', 'internetservice', 'onlinesecurity',
                       'onlinebackup', 'deviceprotection', 'techsupport',
                       'streamingtv', 'streamingmovies', 'contract',
                       'paperlessbilling', 'paymentmethod', 'tenure_group']
    
    indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") 
                for col in categorical_cols]
    
    encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") 
                for col in categorical_cols]
    
    from pyspark.ml import Pipeline
    pipeline = Pipeline(stages=indexers + encoders)
    df = pipeline.fit(df).transform(df)
    
    return df

def create_feature_vector(df):
    encoded_cols = [c for c in df.columns if c.endswith("_encoded")]
    
    numeric_cols = ['tenure', 'monthlycharges', 'totalcharges_clean', 
                   'seniorcitizen', 'total_services', 'charges_per_service',
                   'has_family', 'avg_monthly_spend']
    
    assembler = VectorAssembler(
        inputCols=encoded_cols + numeric_cols,
        outputCol="features_unscaled"
    )
    df = assembler.transform(df)
    
    scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")
    df = scaler.fit(df).transform(df)
    
    df = df.withColumn("label", when(col("churn") == "Yes", 1).otherwise(0))
    
    return df

if __name__ == "__main__":
    spark, df = load_data_from_postgres()
    print(f"Loaded {df.count()} rows")
    
    df = engineer_features(df)
    print("✅ Feature engineering complete")
    
    df = encode_categorical_features(df)
    print("✅ Categorical encoding complete")
    
    df = create_feature_vector(df)
    print("✅ Feature vector created")
    
    final_df = df.select("customerid", "features", "label")
    final_df.show(5)
    
    final_df.write.mode("overwrite").parquet("./data/processed_features.parquet")
    print("✅ Saved processed features to parquet")
    
    spark.stop()


25/11/10 23:01:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/10 23:01:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Loaded 7043 rows
✅ Feature engineering complete
✅ Categorical encoding complete


25/11/10 23:01:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


✅ Feature vector created
+----------+--------------------+-----+
|customerid|            features|label|
+----------+--------------------+-----+
|7590-VHVEG|(54,[1,3,4,7,10,1...|    0|
|5575-GNVDE|(54,[0,2,4,6,8,12...|    0|
|3668-QPYBK|(54,[0,2,4,6,8,12...|    1|
|7795-CFOCW|(54,[0,2,4,7,10,1...|    0|
|9237-HQITU|(54,[1,2,4,6,8,11...|    1|
+----------+--------------------+-----+
only showing top 5 rows


                                                                                

✅ Saved processed features to parquet
