In [25]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql import functions as F

In [11]:
schema = StructType([
    StructField("Customer_ID", IntegerType(), True),
    StructField("Customer_Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Purchase_Amount", FloatType(), True),
    StructField("Purchase_Date", DateType(), True)
])

In [22]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .csv("./sample_data.csv")

### Drop duplicates

In [24]:
df.distinct().count(), df.count()

(25, 26)

In [26]:
df.select(F.countDistinct("Customer_ID")).show()

+---------------------------+
|count(DISTINCT Customer_ID)|
+---------------------------+
|                         25|
+---------------------------+



In [27]:
df_unique = df.dropDuplicates()

### Fix missing columns

In [51]:
df_unique.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).show()

+-----------+-------------+---+------+---------------+-------------+
|Customer_ID|Customer_Name|Age|Gender|Purchase_Amount|Purchase_Date|
+-----------+-------------+---+------+---------------+-------------+
|          0|            0|  2|     1|              2|            0|
+-----------+-------------+---+------+---------------+-------------+



In [49]:
df_cleaned_missing = df_unique.select(
    F.col("Customer_ID"),
    F.col("Customer_Name"),
    F.coalesce(F.col("Age"), F.lit(0)).alias("Age"),
    F.coalesce(F.col("Gender"), F.lit("Female")).alias("Gender"),
    F.coalesce(F.col("Purchase_Amount"), F.lit(0.0)).alias("Purchase_Amount"),
    F.col("Purchase_Date"),
)

### Remove outlier

In [36]:
df_cleaned_missing.filter(F.col("Age") >= 100).show()

+-----------+-------------+---+------+---------------+-------------+
|Customer_ID|Customer_Name|Age|Gender|Purchase_Amount|Purchase_Date|
+-----------+-------------+---+------+---------------+-------------+
|        109|     Tim Cook|102|  Male|          320.0|   2025-01-23|
+-----------+-------------+---+------+---------------+-------------+



In [38]:
df_cleaned_missing.filter(F.col("Purchase_Amount") >= 10000).show()

+-----------+-------------+---+------+---------------+-------------+
|Customer_ID|Customer_Name|Age|Gender|Purchase_Amount|Purchase_Date|
+-----------+-------------+---+------+---------------+-------------+
|        124|  Bruce Wayne| 45|  Male|        15000.0|   2025-02-07|
+-----------+-------------+---+------+---------------+-------------+



In [44]:
df_cleaned_outlier = df_cleaned_missing.filter((F.col("Age") < 100) & (F.col("Purchase_Amount") < 10000))

In [45]:
df_cleaned_outlier.count()

23

### Binary Format Conversion

In [54]:
df_cleaned_outlier.groupBy("gender").count().show()

+-------+-----+
| gender|count|
+-------+-----+
| Female|   12|
|Unknown|    1|
|   Male|   10|
+-------+-----+



In [59]:
df_cleaned_gender = df_cleaned_outlier.withColumn("gender",
                                                 F.when(F.col("gender") == "Unknown", "Female")
                                                 .otherwise(F.col("gender")))

In [61]:
df_cleaned_gender_bin = df_cleaned_gender.withColumn("gender_bin",
                                                     F.when(F.col("gender") == "Female", 0).otherwise(1))

### Split Column

In [63]:
df_cleaned = df_cleaned_gender_bin.select(
    F.col("Customer_ID"),
    F.split(F.col("Customer_Name"), " ").getItem(0).alias("First_Name"),
    F.split(F.col("Customer_Name"), " ").getItem(1).alias("Last_Name"),
    F.col("Age"),
    F.col("Gender"),
    F.col("Purchase_Amount"),
    F.col("Purchase_Date"),
)

In [64]:
df_cleaned.show(3)

+-----------+----------+---------+---+------+---------------+-------------+
|Customer_ID|First_Name|Last_Name|Age|Gender|Purchase_Amount|Purchase_Date|
+-----------+----------+---------+---+------+---------------+-------------+
|        119|     David|      Kim| 14|  Male|           60.0|   2025-02-02|
|        117|       Tom|    Hardy| 40|  Male|            0.0|   2025-01-31|
|        118|      Anna|  Collins| 72|Female|          350.0|   2025-02-01|
+-----------+----------+---------+---+------+---------------+-------------+
only showing top 3 rows



### Agg

In [89]:
total_purchase_by_gender = df_cleaned.groupBy("gender") \
                                .agg(F.sum("purchase_amount").alias("total_purchase_amount")) \
                                .collect()

In [94]:
avg_purchase_by_age_grp = df_cleaned.withColumn(
    "Age_Group",
    F.when((F.col("age") >= 18) & (F.col("age") <= 30), "18-30")
    .when((F.col("Age") >= 31) & (F.col("Age") <= 40), "31-40")
    .when((F.col("Age") >= 41) & (F.col("Age") <= 50), "41-50")
    .when((F.col("Age") >= 51) & (F.col("Age") <= 60), "51-60")
    .otherwise("61-70")
).groupBy("age_group") \
    .agg(F.avg("purchase_amount").alias("avg_purchase_amount")).collect()

In [79]:
avg_purchase_by_age_grp

[Row(age_group='41-50', avg(purchase_amount)=550.0),
 Row(age_group='18-30', avg(purchase_amount)=515.125),
 Row(age_group='31-40', avg(purchase_amount)=454.375),
 Row(age_group='51-60', avg(purchase_amount)=800.0),
 Row(age_group='61-70', avg(purchase_amount)=348.2487487792969)]

In [91]:
for r in total_purchase_by_gender:
    print(f"{r['gender']}: {r['total_purchase_amount']}")

Female: 6211.0
Male: 4170.489990234375


In [95]:
for r in avg_purchase_by_age_grp:
    print(f"{r['age_group']}: {r['avg_purchase_amount']}")

41-50: 550.0
18-30: 515.125
31-40: 454.375
51-60: 800.0
61-70: 348.2487487792969


In [96]:
df_cleaned.show(5)

+-----------+----------+---------+---+------+---------------+-------------+
|Customer_ID|First_Name|Last_Name|Age|Gender|Purchase_Amount|Purchase_Date|
+-----------+----------+---------+---+------+---------------+-------------+
|        119|     David|      Kim| 14|  Male|           60.0|   2025-02-02|
|        117|       Tom|    Hardy| 40|  Male|            0.0|   2025-01-31|
|        118|      Anna|  Collins| 72|Female|          350.0|   2025-02-01|
|        115|    Phoebe|   Buffay| 31|Female|          900.0|   2025-01-29|
|        112|    Monica|   Geller| 34|Female|          615.0|   2025-01-26|
+-----------+----------+---------+---+------+---------------+-------------+
only showing top 5 rows

