#DATA READ & PROCESS

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomerDataProcessing").getOrCreate()
spark

<pyspark.sql.connect.session.SparkSession at 0xff9ebb2efb90>

In [0]:

df = spark.read.option("header", True).option("inferSchema", True).csv("/Volumes/workspace/default/test/customer_data_50_records.csv")


df.show(5)



+-----------+-------------------+----------------+--------------+----------+-----------------+---------+
|customer_id|               name|            city|         state|   country|registration_date|is_active|
+-----------+-------------------+----------------+--------------+----------+-----------------+---------+
|          1|Samantha Montgomery|      Teresafurt|      Kentucky|      Cuba|       2024-11-03|    false|
|          2|       Richard Moon|         Annfurt|    California|   Burundi|       2025-06-14|     true|
|          3|     Cameron Murray|       Kaylatown|      Missouri|   Algeria|       2021-08-11|     true|
|          4|       Amanda Kelly|North Annchester|  South Dakota|Tajikistan|       2020-10-28|     true|
|          5|      Omar Martinez|      Vargastown|North Carolina|    Latvia|       2023-02-28|    false|
+-----------+-------------------+----------------+--------------+----------+-----------------+---------+
only showing top 5 rows


In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [0]:
from pyspark.sql.functions import*

In [0]:

df = df.withColumn(
    'registration_date', 
    to_date(col('registration_date'), 'yyyy/MM/dd')
).withColumn(
    'is_active', 
    col('is_active').cast('boolean')
)

In [0]:
df.show()

+-----------+-------------------+----------------+--------------+--------------------+-----------------+---------+
|customer_id|               name|            city|         state|             country|registration_date|is_active|
+-----------+-------------------+----------------+--------------+--------------------+-----------------+---------+
|          1|Samantha Montgomery|      Teresafurt|      Kentucky|                Cuba|       2024-11-03|    false|
|          2|       Richard Moon|         Annfurt|    California|             Burundi|       2025-06-14|     true|
|          3|     Cameron Murray|       Kaylatown|      Missouri|             Algeria|       2021-08-11|     true|
|          4|       Amanda Kelly|North Annchester|  South Dakota|          Tajikistan|       2020-10-28|     true|
|          5|      Omar Martinez|      Vargastown|North Carolina|              Latvia|       2023-02-28|    false|
|          6|      James Johnson|       Maryburgh|         Idaho|          San M

In [0]:
df=df.fillna({'city':'unkown','state':'unkown','country':'unkown'})


In [0]:
df.show(5)

+-----------+-------------------+----------------+--------------+----------+-----------------+---------+
|customer_id|               name|            city|         state|   country|registration_date|is_active|
+-----------+-------------------+----------------+--------------+----------+-----------------+---------+
|          1|Samantha Montgomery|      Teresafurt|      Kentucky|      Cuba|       2024-11-03|    false|
|          2|       Richard Moon|         Annfurt|    California|   Burundi|       2025-06-14|     true|
|          3|     Cameron Murray|       Kaylatown|      Missouri|   Algeria|       2021-08-11|     true|
|          4|       Amanda Kelly|North Annchester|  South Dakota|Tajikistan|       2020-10-28|     true|
|          5|      Omar Martinez|      Vargastown|North Carolina|    Latvia|       2023-02-28|    false|
+-----------+-------------------+----------------+--------------+----------+-----------------+---------+
only showing top 5 rows


In [0]:
df=df.withColumn('registration_year',year(col('registration_date')))
df.show(5)

+-----------+-------------------+----------------+--------------+----------+-----------------+---------+-----------------+
|customer_id|               name|            city|         state|   country|registration_date|is_active|registration_year|
+-----------+-------------------+----------------+--------------+----------+-----------------+---------+-----------------+
|          1|Samantha Montgomery|      Teresafurt|      Kentucky|      Cuba|       2024-11-03|    false|             2024|
|          2|       Richard Moon|         Annfurt|    California|   Burundi|       2025-06-14|     true|             2025|
|          3|     Cameron Murray|       Kaylatown|      Missouri|   Algeria|       2021-08-11|     true|             2021|
|          4|       Amanda Kelly|North Annchester|  South Dakota|Tajikistan|       2020-10-28|     true|             2020|
|          5|      Omar Martinez|      Vargastown|North Carolina|    Latvia|       2023-02-28|    false|             2023|
+-----------+---

In [0]:
unique_cities=df.select(countDistinct('city')).collect()
unique_cities[0]
unique_states=df.select(countDistinct('state')).collect()
unique_states[0]
unique_countries=df.select(countDistinct('country')).collect()
unique_countries[0][0]

42

In [0]:
df.groupBy('state').pivot('is_active').count().show(5)


+----------+-----+----+
|     state|false|true|
+----------+-----+----+
|     Maine|    1|NULL|
|New Jersey|    1|   1|
|  Arkansas|    1|NULL|
|   Arizona| NULL|   1|
|     Idaho| NULL|   3|
+----------+-----+----+
only showing top 5 rows


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank, dense_rank, row_number

window_spec = Window.partitionBy('state').orderBy(col('registration_year').desc())

df = df.withColumn('rank', rank().over(window_spec))\
       .withColumn('dense_rank', dense_rank().over(window_spec))\
       .withColumn('row_number', row_number().over(window_spec))

df.show(5)


+-----------+---------------+----------------+----------+--------------------+-----------------+---------+-----------------+----+----------+----------+
|customer_id|           name|            city|     state|             country|registration_date|is_active|registration_year|rank|dense_rank|row_number|
+-----------+---------------+----------------+----------+--------------------+-----------------+---------+-----------------+----+----------+----------+
|         50|Katherine Smith|North Jamieburgh|    Alaska|                Cuba|       2020-08-23|     true|             2020|   1|         1|         1|
|         17|     Brian Lang|      West Robin|   Arizona|               Malta|       2022-08-02|     true|             2022|   1|         1|         1|
|         47|     Pamela Kim|        Ryanberg|  Arkansas|Falkland Islands ...|       2023-12-26|    false|             2023|   1|         1|         1|
|          2|   Richard Moon|         Annfurt|California|             Burundi|       202

In [0]:
df.select('name','city','state','country','rank','dense_rank','row_number').show(5)

+---------------+----------------+----------+--------------------+----+----------+----------+
|           name|            city|     state|             country|rank|dense_rank|row_number|
+---------------+----------------+----------+--------------------+----+----------+----------+
|Katherine Smith|North Jamieburgh|    Alaska|                Cuba|   1|         1|         1|
|     Brian Lang|      West Robin|   Arizona|               Malta|   1|         1|         1|
|     Pamela Kim|        Ryanberg|  Arkansas|Falkland Islands ...|   1|         1|         1|
|   Richard Moon|         Annfurt|California|             Burundi|   1|         1|         1|
|  Susan Hartman| North Dawnburgh|California|      American Samoa|   2|         2|         2|
+---------------+----------------+----------+--------------------+----+----------+----------+
only showing top 5 rows


In [0]:
df_recent_customer = df.filter(col("registration_date") >= lit("2023-07-01"))
df_recent_customer.show()
df.count()

+-----------+-------------------+----------------+--------------+--------------------+-----------------+---------+-----------------+----+----------+----------+
|customer_id|               name|            city|         state|             country|registration_date|is_active|registration_year|rank|dense_rank|row_number|
+-----------+-------------------+----------------+--------------+--------------------+-----------------+---------+-----------------+----+----------+----------+
|         47|         Pamela Kim|        Ryanberg|      Arkansas|Falkland Islands ...|       2023-12-26|    false|             2023|   1|         1|         1|
|          2|       Richard Moon|         Annfurt|    California|             Burundi|       2025-06-14|     true|             2025|   1|         1|         1|
|         48|  Dr. Krista Graham|     Chavezmouth|      Delaware|    Pitcairn Islands|       2025-06-27|    false|             2025|   1|         1|         1|
|         26|        Julie Kelly|   West

50

OLDEST AND NEWST CUSTOMER PER CITY

In [0]:
df.groupBy('city').agg(min('registration_date').alias('oldest'),max('registration_date').alias('newest')).show(5)


+----------------+----------+----------+
|            city|    oldest|    newest|
+----------------+----------+----------+
|New Williammouth|2023-04-23|2023-04-23|
|       Frankfurt|2024-10-22|2024-10-22|
|       Smithland|2024-03-30|2024-03-30|
|      Teresafurt|2024-11-03|2024-11-03|
|       Rosehaven|2022-07-07|2022-07-07|
+----------------+----------+----------+
only showing top 5 rows


# JOINNING AND ANALYZING CUSTOMER ORDERS

In [0]:
orders_df=spark.read.format('csv').option('header','true').option('inferSchema','true').load('/Volumes/workspace/default/test/orders_data_adjusted.csv')
orders_df.show(5)

+--------+-----------+----------+------------------+---------+
|order_id|customer_id|order_date|      total_amount|   status|
+--------+-----------+----------+------------------+---------+
|       0|       1564|2024-08-09| 3965.908675151353|Delivered|
|       1|      11438|2024-08-04|  3391.31034436957|  Pending|
|       2|       1604|2024-08-10|1318.0750183651003|  Pending|
|       3|       7489|2024-08-22| 513.9839357261428|Delivered|
|       4|       1554|2024-08-05|2361.3925218392974|Delivered|
+--------+-----------+----------+------------------+---------+
only showing top 5 rows


In [0]:
customers_orders_df=df.join(orders_df,'customer_id',"outer")



In [0]:
customers_orders_df.show()

+-----------+-------------------+----------------+-------------+--------------------+-----------------+---------+-----------------+----+----------+----------+--------+----------+------------+------+
|customer_id|               name|            city|        state|             country|registration_date|is_active|registration_year|rank|dense_rank|row_number|order_id|order_date|total_amount|status|
+-----------+-------------------+----------------+-------------+--------------------+-----------------+---------+-----------------+----+----------+----------+--------+----------+------------+------+
|         29|  Richard Maldonado|   East Brittany|West Virginia|             Vanuatu|       2025-05-21|    false|             2025|   1|         1|         1|    NULL|      NULL|        NULL|  NULL|
|         24|    Matthew Stewart|       Rosehaven|        Idaho|British Virgin Is...|       2022-07-07|     true|             2022|   1|         1|         2|    NULL|      NULL|        NULL|  NULL|
|    

In [0]:
#get total order per customer 

customers_orders_count=customers_orders_df.groupBy('customer_id').count().orderBy(col('count').desc())
customers_orders_count.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|         24|    1|
|       5967|    1|
|       2698|    1|
|         29|    1|
|         39|    1|
+-----------+-----+
only showing top 5 rows


In [0]:
#total spend per customer 
customer_total_spend= customers_orders_df.groupBy('customer_id').agg(sum('total_amount')).orderBy(col('sum(total_amount)').desc())
customer_total_spend.show(10)

+-----------+------------------+
|customer_id| sum(total_amount)|
+-----------+------------------+
|      11251| 4852.791766322864|
|       7630| 4785.446954689925|
|       3047|4666.8361800275925|
|       6520| 4508.368568009902|
|      13161| 4500.269893291697|
|      10224| 4494.740364834989|
|      14568| 4485.984719835927|
|       5967| 4250.017746613385|
|      12921| 4182.487922944138|
|       4011| 4123.033676276145|
+-----------+------------------+
only showing top 10 rows


In [0]:
#AVG SPEND
customer_avg_spend= customers_orders_df.groupBy('customer_id').agg(avg('total_amount')).orderBy(col('avg(total_amount)').desc())
customer_avg_spend.show(10)

+-----------+------------------+
|customer_id| avg(total_amount)|
+-----------+------------------+
|      11251| 4852.791766322864|
|       7630| 4785.446954689925|
|       3047|4666.8361800275925|
|       6520| 4508.368568009902|
|      13161| 4500.269893291697|
|      10224| 4494.740364834989|
|      14568| 4485.984719835927|
|       5967| 4250.017746613385|
|      12921| 4182.487922944138|
|       4011| 4123.033676276145|
+-----------+------------------+
only showing top 10 rows


In [0]:
#ORDER BY STATUS 
order_status_count= customers_orders_df.groupBy('status').count()
order_status_count.show()

+---------+-----+
|   status|count|
+---------+-----+
|     NULL|   50|
|Delivered|   17|
|Cancelled|    8|
|  Shipped|   13|
|  Pending|   12|
+---------+-----+



In [0]:
#order by month 
order_by_month=customers_orders_df.withColumn('orders_month',month(col('order_date')))\
    .groupBy('orders_month').count().orderBy(col('orders_month'))
order_by_month.show()

+------------+-----+
|orders_month|count|
+------------+-----+
|        NULL|   50|
|           8|   50|
+------------+-----+



In [0]:
window_spec = Window.orderBy(col('sum(total_amount)').desc())

ranked_customer = customer_total_spend.withColumn('dense_rank', dense_rank().over(window_spec))
display(ranked_customer.limit(10))



customer_id,sum(total_amount),dense_rank
11251,4852.791766322864,1
7630,4785.446954689925,2
3047,4666.8361800275925,3
6520,4508.368568009902,4
13161,4500.269893291697,5
10224,4494.740364834989,6
14568,4485.984719835927,7
5967,4250.017746613385,8
12921,4182.487922944138,9
4011,4123.033676276145,10


In [0]:
customer_total_spend,customers_orders_count

In [0]:
#finding customer with high order frequency but low total spend 
customer_spend_vs_orders=customers_orders_count.join(customer_total_spend,'customer_id',"outer")\
    .orderBy(col('count').desc(),col('sum(total_amount)'))
    
customer_spend_vs_orders.show(10)

+-----------+-----+-----------------+
|customer_id|count|sum(total_amount)|
+-----------+-----+-----------------+
|         44|    1|             NULL|
|         38|    1|             NULL|
|         29|    1|             NULL|
|         39|    1|             NULL|
|         20|    1|             NULL|
|         24|    1|             NULL|
|         22|    1|             NULL|
|         26|    1|             NULL|
|         21|    1|             NULL|
|         15|    1|             NULL|
+-----------+-----+-----------------+
only showing top 10 rows
