In [74]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data read & process').getOrCreate()

In [75]:
spark

In [76]:
path = '/data/customer_csv_for_project.csv'
df = spark.read.csv(path, header=True, inferSchema=True)

In [5]:
df.show()

+----------+-----------+---------+-----------+-------+----------+--------+
|CustomerID|       Name|     City|      State|Country|SignupDate|IsActive|
+----------+-----------+---------+-----------+-------+----------+--------+
|         1| Customer_1|  Chennai|Maharashtra|  India|2021-01-01|    true|
|         2| Customer_2|  Chennai|  Karnataka|  India|2021-01-02|    true|
|         3| Customer_3|    Delhi| Tamil Nadu|  India|2021-01-03|    true|
|         4| Customer_4|    Delhi|West Bengal|  India|2021-01-04|    true|
|         5| Customer_5|   Mumbai| Tamil Nadu|  India|2021-01-05|   false|
|         6| Customer_6|  Kolkata|    Gujarat|  India|2021-01-06|   false|
|         7| Customer_7|  Kolkata|West Bengal|  India|2021-01-07|   false|
|         8| Customer_8|Bangalore| Tamil Nadu|  India|2021-01-08|   false|
|         9| Customer_9|  Kolkata|    Gujarat|  India|2021-01-09|   false|
|        10|Customer_10|   Mumbai|Maharashtra|  India|2021-01-10|   false|
|        11|Customer_11| 

In [6]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- SignupDate: date (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [7]:
from pyspark.sql.functions import * 

df = df.withColumn('SignupDate', to_date(col('SignupDate'), 'yyyy-MM-dd'))\
       .withColumn('IsActive', col('IsActive').cast('boolean'))

In [8]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- SignupDate: date (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [9]:
df = df.fillna({'City' : 'Unknown', 'state' : 'Unknown', 'Country' : 'Unknown'}) # replace empty/null value with 'Unknown' in city, state, country columns

In [10]:
#add two new columns year and month from signUpDate
df = df.withColumn('year', year(col('SignupDate')))\
       .withColumn('month', month(col('SignupDate')))

In [11]:
df.show()

+----------+-----------+---------+-----------+-------+----------+--------+----+-----+
|CustomerID|       Name|     City|      State|Country|SignupDate|IsActive|year|month|
+----------+-----------+---------+-----------+-------+----------+--------+----+-----+
|         1| Customer_1|  Chennai|Maharashtra|  India|2021-01-01|    true|2021|    1|
|         2| Customer_2|  Chennai|  Karnataka|  India|2021-01-02|    true|2021|    1|
|         3| Customer_3|    Delhi| Tamil Nadu|  India|2021-01-03|    true|2021|    1|
|         4| Customer_4|    Delhi|West Bengal|  India|2021-01-04|    true|2021|    1|
|         5| Customer_5|   Mumbai| Tamil Nadu|  India|2021-01-05|   false|2021|    1|
|         6| Customer_6|  Kolkata|    Gujarat|  India|2021-01-06|   false|2021|    1|
|         7| Customer_7|  Kolkata|West Bengal|  India|2021-01-07|   false|2021|    1|
|         8| Customer_8|Bangalore| Tamil Nadu|  India|2021-01-08|   false|2021|    1|
|         9| Customer_9|  Kolkata|    Gujarat|  India|

In [12]:
#get count of unique city, state, country
unique_cities = df.select('city').distinct().count()

unique_states = df.select('state').distinct().count()

unique_countries = df.select('country').distinct().count()

print(f'unique_cities = {unique_cities}, unique_states = {unique_states} and unique_countries = {unique_countries}')

                                                                                

unique_cities = 5, unique_states = 5 and unique_countries = 1


In [13]:
#get which city has max no of customers
df.groupBy('city', 'state').count().orderBy(desc('count')).show()

+---------+-----------+-----+
|     city|      state|count|
+---------+-----------+-----+
|  Chennai| Tamil Nadu|  774|
|  Kolkata|  Karnataka|  758|
|Bangalore|  Karnataka|  757|
|   Mumbai|Maharashtra|  750|
|    Delhi|  Karnataka|  741|
|  Chennai|Maharashtra|  739|
|  Kolkata| Tamil Nadu|  734|
|  Kolkata|West Bengal|  730|
|Bangalore|West Bengal|  715|
|  Chennai|  Karnataka|  714|
|Bangalore| Tamil Nadu|  710|
|    Delhi|    Gujarat|  708|
|   Mumbai|  Karnataka|  702|
|  Chennai|    Gujarat|  699|
|    Delhi|West Bengal|  697|
|Bangalore|    Gujarat|  688|
|  Chennai|West Bengal|  682|
|   Mumbai| Tamil Nadu|  673|
|    Delhi|Maharashtra|  658|
|  Kolkata|    Gujarat|  652|
+---------+-----------+-----+
only showing top 20 rows



In [14]:
df.groupBy('state', 'IsActive').count().orderBy('state').show()

# Pivot table - Count of active and inactive users per state
df.groupBy('state').pivot('isActive').count().show()#.withColumnRenamed('true', 'active').withColumnRenamed('false', 'inactive').show()

+-----------+--------+-----+
|      state|IsActive|count|
+-----------+--------+-----+
|    Gujarat|    true| 1744|
|    Gujarat|   false| 1653|
|  Karnataka|    true| 1787|
|  Karnataka|   false| 1885|
|Maharashtra|    true| 1715|
|Maharashtra|   false| 1716|
| Tamil Nadu|    true| 1670|
| Tamil Nadu|   false| 1806|
|West Bengal|   false| 1737|
|West Bengal|    true| 1687|
+-----------+--------+-----+

+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|    Gujarat| 1653|1744|
|  Karnataka| 1885|1787|
|Maharashtra| 1716|1715|
| Tamil Nadu| 1806|1670|
|West Bengal| 1737|1687|
+-----------+-----+----+



In [20]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy('state').orderBy(col('SignupDate').desc())
df = df.withColumn('rank', rank().over(windowSpec)) \
       .withColumn('dense_rank', dense_rank().over(windowSpec)) \
       .withColumn('row_number', row_number().over(windowSpec))

In [21]:
df.show()

[Stage 42:>                                                         (0 + 1) / 1]

+----------+-------------+---------+-------+-------+----------+--------+----+-----+----+----------+----------+
|CustomerID|         Name|     City|  State|Country|SignupDate|IsActive|year|month|rank|dense_rank|row_number|
+----------+-------------+---------+-------+-------+----------+--------+----+-----+----+----------+----------+
|      4996|Customer_4996|  Kolkata|Gujarat|  India|2034-09-05|   false|2034|    9|   1|         1|         1|
|      9996|Customer_4996|  Kolkata|Gujarat|  India|2034-09-05|   false|2034|    9|   1|         1|         2|
|     14996|Customer_4996|  Kolkata|Gujarat|  India|2034-09-05|   false|2034|    9|   1|         1|         3|
|      4995|Customer_4995|  Kolkata|Gujarat|  India|2034-09-04|    true|2034|    9|   4|         2|         4|
|      9995|Customer_4995|  Kolkata|Gujarat|  India|2034-09-04|    true|2034|    9|   4|         2|         5|
|     14995|Customer_4995|  Kolkata|Gujarat|  India|2034-09-04|    true|2034|    9|   4|         2|         6|
|

                                                                                

| Function       | What it means                                                                                                                      |
| -------------- | ---------------------------------------------------------------------------------------------------------------------------------- |
| `rank()`       | Assigns a rank. If two users have the **same registration\_date**, they get the same rank, but the next one skips (e.g., 1, 1, 3). |
| `dense_rank()` | Same as `rank`, **but no gaps** (e.g., 1, 1, 2).                                                                                   |
| `row_number()` | Assigns a unique row number, no matter what. Even if two rows have the same date, it gives 1, 2, 3...                              |


In [22]:
df.select('state', 'signupdate', 'rank', 'dense_rank', 'row_number').show()

+-------+----------+----+----------+----------+
|  state|signupdate|rank|dense_rank|row_number|
+-------+----------+----+----------+----------+
|Gujarat|2034-09-05|   1|         1|         1|
|Gujarat|2034-09-05|   1|         1|         2|
|Gujarat|2034-09-05|   1|         1|         3|
|Gujarat|2034-09-04|   4|         2|         4|
|Gujarat|2034-09-04|   4|         2|         5|
|Gujarat|2034-09-04|   4|         2|         6|
|Gujarat|2034-08-25|   7|         3|         7|
|Gujarat|2034-08-25|   7|         3|         8|
|Gujarat|2034-08-25|   7|         3|         9|
|Gujarat|2034-08-24|  10|         4|        10|
|Gujarat|2034-08-24|  10|         4|        11|
|Gujarat|2034-08-24|  10|         4|        12|
|Gujarat|2034-08-17|  13|         5|        13|
|Gujarat|2034-08-17|  13|         5|        14|
|Gujarat|2034-08-17|  13|         5|        15|
|Gujarat|2034-08-14|  16|         6|        16|
|Gujarat|2034-08-14|  16|         6|        17|
|Gujarat|2034-08-14|  16|         6|    

In [26]:
df_recent_cust = df.filter(col('SignupDate') >= lit('2023-07-01'))
    
# lit() stands for literal — it tells Spark: “Hey, this '2023-07-01' is a constant value. Treat it like a fixed date, not a column.”
# Without lit(), Spark might think '2023-07-01' is a column name, which would mess things up 😅

df_recent_cust.count()

13756

In [30]:
#oldest and newest customer per city
df.groupBy('city').agg(min('SignupDate').alias('oldest'), max('SignupDate').alias('newest')).show()

+---------+----------+----------+
|     city|    oldest|    newest|
+---------+----------+----------+
|    Delhi|2021-01-03|2034-09-03|
|  Kolkata|2021-01-06|2034-09-05|
|Bangalore|2021-01-08|2034-09-09|
|  Chennai|2021-01-01|2034-09-08|
|   Mumbai|2021-01-05|2034-09-07|
+---------+----------+----------+



In [39]:
!hdfs dfs -ls /data/spark_mini_project_op

Found 2 items
-rw-r--r--   2 root hadoop          0 2025-06-16 03:17 /data/spark_mini_project_op/_SUCCESS
-rw-r--r--   2 root hadoop     258624 2025-06-16 03:17 /data/spark_mini_project_op/part-00000-3ef5183a-3526-48a9-83d5-e79f0a4ebf9e-c000.snappy.parquet


In [106]:
op_path = '/data/spark_mini_project_op'
df.write.mode('overwrite').parquet(op_path)

## Working with Orders table Data

In [40]:
order_table_path = '/data/orders.csv'
df_orders = spark.read.csv(order_table_path, header = True, inferSchema = True)

In [41]:
df_orders.show(5)

+--------+-----------+----------+-----------------+---------+
|order_id|customer_id|order_date|     total_amount|   status|
+--------+-----------+----------+-----------------+---------+
|       0|       3692|2024-09-03|547.7160076008001|  Shipped|
|       1|      11055|2024-08-10|577.8942599188381|  Pending|
|       2|       6963|2024-08-22|484.2085562764487|  Pending|
|       3|      13268|2024-09-01|366.3286882431848|Cancelled|
|       4|       1131|2024-08-09|896.9588380686909|  Pending|
+--------+-----------+----------+-----------------+---------+
only showing top 5 rows



In [42]:
df_orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- status: string (nullable = true)



In [45]:
#inner join between customer table and order table
customer_order_df = df.join(df_orders, df.CustomerID == df_orders.customer_id, 'inner')
customer_order_df.show()

+----------+-------------+---------+-------+-------+----------+--------+----+-----+----+----------+----------+--------+-----------+----------+------------------+---------+
|CustomerID|         Name|     City|  State|Country|SignupDate|IsActive|year|month|rank|dense_rank|row_number|order_id|customer_id|order_date|      total_amount|   status|
+----------+-------------+---------+-------+-------+----------+--------+----+-----+----+----------+----------+--------+-----------+----------+------------------+---------+
|      4996|Customer_4996|  Kolkata|Gujarat|  India|2034-09-05|   false|2034|    9|   1|         1|         1|    5542|       4996|2024-02-06| 465.8029849773852|  Shipped|
|     14996|Customer_4996|  Kolkata|Gujarat|  India|2034-09-05|   false|2034|    9|   1|         1|         3|    8166|      14996|2024-01-19| 852.6104662494444|  Pending|
|     14996|Customer_4996|  Kolkata|Gujarat|  India|2034-09-05|   false|2034|    9|   1|         1|         3|    5148|      14996|2024-04-0

In [95]:
#Total orders per customer
cust_order_count = customer_order_df.groupBy('customer_id').count().withColumnRenamed('count', 'no_of_orders').orderBy(col('count').desc())
cust_order_count.show()

+-----------+------------+
|customer_id|no_of_orders|
+-----------+------------+
|      11776|           7|
|       4294|           6|
|       3884|           6|
|       7566|           6|
|       3336|           6|
|       5160|           6|
|      13034|           6|
|      14838|           6|
|       3243|           6|
|      11537|           5|
|       8384|           5|
|       6463|           5|
|       9415|           5|
|       9073|           5|
|      11002|           5|
|      14933|           5|
|       8612|           5|
|       1823|           5|
|      13827|           5|
|      12725|           5|
+-----------+------------+
only showing top 20 rows



In [64]:
#Total spends per customer
cust_spend_df = customer_order_df.groupBy('customer_id').agg(sum('total_amount').alias('total_spent')).orderBy(col('total_spent').desc())
cust_spend_df.show()

+-----------+------------------+
|customer_id|       total_spent|
+-----------+------------------+
|       3336| 4362.550733141537|
|       3884|  4187.99763145619|
|      16020|3967.2692112582276|
|      14372| 3961.787139557334|
|      14933|3828.5841072418348|
|       7566| 3647.119115720654|
|      10559|3548.8378633460234|
|      11776|  3438.36692751212|
|      11449| 3396.060974816134|
|       5425| 3389.162933156913|
|      15014| 3365.078728621099|
|      13940|3354.7104846187476|
|       9282| 3335.140545881391|
|       9073|3327.7198997109954|
|       9415| 3314.702664499254|
|      17069| 3248.721429958719|
|       6588|3231.5186848832673|
|      10997|  3208.89243531518|
|       1636| 3195.551527032096|
|      13034|3195.0224622202268|
+-----------+------------------+
only showing top 20 rows



In [66]:
#order by status
cust_status_df = customer_order_df.groupBy('status').count()
cust_status_df.show()

+---------+-----+
|   status|count|
+---------+-----+
|Cancelled| 4427|
|Delivered| 4289|
|  Shipped| 4339|
|  Pending| 4395|
+---------+-----+



In [70]:
#order by month
orders_by_month_df = customer_order_df.withColumn('order_month', month('order_date')).groupBy('order_month').count().orderBy('order_month')
orders_by_month_df.show()

+-----------+-----+
|order_month|count|
+-----------+-----+
|          1| 1489|
|          2| 1352|
|          3| 1518|
|          4| 1441|
|          5| 1497|
|          6| 1435|
|          7| 1500|
|          8| 1449|
|          9| 1434|
|         10| 1497|
|         11| 1415|
|         12| 1423|
+-----------+-----+



In [91]:
window_spec = Window.orderBy(col('total_spent').desc())
ranked_cust = cust_spend_df.withColumn('dense_rank', dense_rank().over(window_spec))
ranked_cust.show()

25/06/16 11:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 11:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 11:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 11:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 11:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------+------------------+----------+
|customer_id|       total_spent|dense_rank|
+-----------+------------------+----------+
|       3336| 4362.550733141537|         1|
|       3884|  4187.99763145619|         2|
|      16020|3967.2692112582276|         3|
|      14372| 3961.787139557334|         4|
|      14933|3828.5841072418348|         5|
|       7566| 3647.119115720654|         6|
|      10559|3548.8378633460234|         7|
|      11776|  3438.36692751212|         8|
|      11449| 3396.060974816134|         9|
|       5425| 3389.162933156913|        10|
|      15014| 3365.078728621099|        11|
|      13940|3354.7104846187476|        12|
|       9282| 3335.140545881391|        13|
|       9073|3327.7198997109954|        14|
|       9415| 3314.702664499254|        15|
|      17069| 3248.721429958719|        16|
|       6588|3231.5186848832673|        17|
|      10997|  3208.89243531518|        18|
|       1636| 3195.551527032096|        19|
|      13034|3195.0224622202268|

25/06/16 11:49:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 11:49:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 11:49:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [88]:
customer_order_df.rdd.getNumPartitions()

1

In [97]:
cust_spend_df, cust_order_count

(DataFrame[customer_id: int, total_spent: double],
 DataFrame[customer_id: int, no_of_orders: bigint])

In [102]:
#Finding customers with High Order Frequency but Low Total Spend

customer_spend_vs_order = cust_order_count.join(cust_spend_df, 'customer_id', 'inner').orderBy(col('no_of_orders').desc())
customer_spend_vs_order.show(10)

+-----------+------------+------------------+
|customer_id|no_of_orders|       total_spent|
+-----------+------------+------------------+
|      11776|           7|  3438.36692751212|
|       4294|           6| 1821.603928366352|
|       3884|           6|  4187.99763145619|
|       7566|           6| 3647.119115720654|
|       3336|           6| 4362.550733141537|
|       5160|           6| 1656.737343311546|
|      13034|           6|3195.0224622202268|
|      14838|           6| 2894.355602564058|
|       3243|           6|2860.1827303387754|
|      11537|           5|3155.4999177196364|
+-----------+------------+------------------+
only showing top 10 rows



In [103]:
op_path_cust_spend = '/data/spark_mini_project_op'
df.write.mode('overwrite').parquet(op_path_cust_spend)

In [107]:
!hdfs dfs -ls /data/spark_mini_project_op

Found 2 items
-rw-r--r--   2 root hadoop          0 2025-06-16 12:09 /data/spark_mini_project_op/_SUCCESS
-rw-r--r--   2 root hadoop     175164 2025-06-16 12:09 /data/spark_mini_project_op/part-00000-49674702-6761-4c6b-b5cd-f7d0de88f94c-c000.snappy.parquet
