Basic Transformations in Pyspark

Q1. Consider telecom dataset and perform following operations on it.
●	 Apply basic transformation functions on it to get insights on telecom data . 
●	Create a SQL table of it to perform basic SQL transformations on it.
●	Perform preprocessing transformations on relevant features 
1.	Normalization
2.	Standardization
3.	Encoding
4.	Feature engineering


In [2]:
# --- Start: create Spark session if needed ---
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("TelecomTransform") \
    .getOrCreate()
# --- End Spark session creation ---


1. Read dataset

In [3]:
telecom_path = "telecom.csv"  # change to your file
df = spark.read.option("header", True).option("inferSchema", True).csv(telecom_path)
df.printSchema()
df.show(5, truncate=False)


root
 |-- _c0: integer (nullable = true)
 |-- account_length: integer (nullable = true)
 |-- area_code: integer (nullable = true)
 |-- international_plan: integer (nullable = true)
 |-- voice_mail_plan: integer (nullable = true)
 |-- number_vmail_messages: integer (nullable = true)
 |-- total_day_minutes: double (nullable = true)
 |-- total_day_calls: integer (nullable = true)
 |-- total_day_charge: double (nullable = true)
 |-- total_eve_minutes: double (nullable = true)
 |-- total_eve_calls: integer (nullable = true)
 |-- total_eve_charge: double (nullable = true)
 |-- total_night_minutes: double (nullable = true)
 |-- total_night_calls: integer (nullable = true)
 |-- total_night_charge: double (nullable = true)
 |-- total_intl_minutes: double (nullable = true)
 |-- total_intl_calls: integer (nullable = true)
 |-- total_intl_charge: double (nullable = true)
 |-- customer_service_calls: integer (nullable = true)
 |-- churn: integer (nullable = true)

+---+--------------+---------+----

2. Data Cleaning & Preparation

In [4]:
from pyspark.sql.functions import col

# Drop index column (_c0) if present
if "_c0" in df.columns:
    df = df.drop("_c0")

# Ensure correct data types
num_cols = [c for c in df.columns if c not in ("international_plan", "voice_mail_plan", "churn", "area_code")]
for nc in num_cols:
    df = df.withColumn(nc, col(nc).cast("double"))

# Categorical columns: convert 0/1 to string categories
df = df.withColumn("international_plan", col("international_plan").cast("int"))
df = df.withColumn("voice_mail_plan", col("voice_mail_plan").cast("int"))
df = df.withColumn("churn", col("churn").cast("int"))


3. Create SQL Table & Run Queries

In [5]:
# Register table
df.createOrReplaceTempView("telecom")

# 1. Total number of customers
spark.sql("SELECT COUNT(*) AS total_customers FROM telecom").show()

# 2. Average day minutes and charges by churn status
spark.sql("""
    SELECT churn, 
           ROUND(AVG(total_day_minutes),2) AS avg_day_minutes,
           ROUND(AVG(total_day_charge),2) AS avg_day_charge
    FROM telecom 
    GROUP BY churn
""").show()

# 3. Churn rate by area code
spark.sql("""
    SELECT area_code, 
           COUNT(*) AS total_users, 
           SUM(churn) AS churned_users,
           ROUND(100*SUM(churn)/COUNT(*),2) AS churn_rate_pct
    FROM telecom
    GROUP BY area_code
    ORDER BY churn_rate_pct DESC
""").show()


+---------------+
|total_customers|
+---------------+
|           3333|
+---------------+

+-----+---------------+--------------+
|churn|avg_day_minutes|avg_day_charge|
+-----+---------------+--------------+
|    1|         206.91|         35.18|
|    0|         175.18|         29.78|
+-----+---------------+--------------+

+---------+-----------+-------------+--------------+
|area_code|total_users|churned_users|churn_rate_pct|
+---------+-----------+-------------+--------------+
|      510|        840|          125|         14.88|
|      408|        838|          122|         14.56|
|      415|       1655|          236|         14.26|
+---------+-----------+-------------+--------------+



4. Feature Engineering

In [6]:
from pyspark.sql.functions import when

# Tenure is not present, so engineer from account_length
df = df.withColumn("long_term_customer", when(col("account_length") > 120, 1).otherwise(0))

# Call intensity features
df = df.withColumn("day_call_avg", col("total_day_minutes")/col("total_day_calls"))
df = df.withColumn("eve_call_avg", col("total_eve_minutes")/col("total_eve_calls"))
df = df.withColumn("night_call_avg", col("total_night_minutes")/col("total_night_calls"))

# Intl charge per call (avoid division by zero)
df = df.withColumn("intl_charge_per_call", when(col("total_intl_calls") > 0,
                                               col("total_intl_charge")/col("total_intl_calls"))
                                               .otherwise(0.0))

df.show(5, truncate=False)


+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+------------------+------------------+------------------+------------------+--------------------+
|account_length|area_code|international_plan|voice_mail_plan|number_vmail_messages|total_day_minutes|total_day_calls|total_day_charge|total_eve_minutes|total_eve_calls|total_eve_charge|total_night_minutes|total_night_calls|total_night_charge|total_intl_minutes|total_intl_calls|total_intl_charge|customer_service_calls|churn|long_term_customer|day_call_avg      |eve_call_avg      |night_call_avg    |intl_charge_per_call|
+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+--

5. Encoding, Normalization & Standardization

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, mean

spark = SparkSession.builder.appName("ChurnPipeline").getOrCreate()

# --- Columns ---
cat_cols = ["area_code", "international_plan", "voice_mail_plan"]
label_col = "churn"

# --- Cast numeric columns to double and fill nulls ---
num_features = [c for c, t in df.dtypes if c not in cat_cols + [label_col]]
for c in num_features:
    df = df.withColumn(c, col(c).cast("double"))

# Calculate means for numeric columns and fill nulls
means = df.select([mean(c).alias(c) for c in num_features]).collect()[0].asDict()
df = df.fillna(means, subset=num_features)

# --- Index + OneHotEncode ---
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_ohe") for c in cat_cols]

# --- Assemble features ---
assembler_inputs = num_features + [c+"_ohe" for c in cat_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="raw_features", handleInvalid="keep")

# --- StandardScaler ---
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)

# --- Pipeline ---
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# --- Fit + Transform ---
model = pipeline.fit(df)
ml_ready_df = model.transform(df).select(col(label_col).alias("label"), col("features"))

ml_ready_df.show(5, truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                                                                                                                                                                                                      

6. Final ML-ready Dataset

In [8]:
# Final dataset with label + features
final_df = ml_ready_df.select("label", "features")

final_df.show(5, truncate=False)

# Save prepared dataset
# final_df.write.mode("overwrite").parquet("/path/to/telecom_final.parquet")

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                                                                                                                                                                                                      