In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.appName("Customer Data Processiong")\
.getOrCreate()

25/12/31 18:51:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
spark

In [4]:
!hadoop fs -ls gs://anshk-data/Data/

2025-12-31 18:52:11,800 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-12-31 18:52:11,906 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-12-31 18:52:11,907 INFO impl.MetricsSystemImpl: google-hadoop-file-system metrics system started
Found 4 items
drwx------   - root root          0 1970-01-01 00:00 gs://anshk-data/Data/10MB
drwx------   - root root          0 1970-01-01 00:00 gs://anshk-data/Data/150MB
drwx------   - root root          0 1970-01-01 00:00 gs://anshk-data/Data/1MB
drwx------   - root root          0 1970-01-01 00:00 gs://anshk-data/Data/300MB


In [21]:
df = spark.read\
.format('csv')\
.option('header','true')\
.load('gs://anshk-data/Data/1MB/customers.csv')

In [22]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    False|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     True|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     True|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    False|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    False|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [23]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- is_active: string (nullable = true)



In [24]:
from pyspark.sql.functions import *

In [25]:
df = df.withColumn('registration_date',to_date(col('registration_date'),'yyyy-MM-dd')) \
        .withColumn('is_active',col('is_active').cast('boolean'))

In [26]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [27]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [28]:
df = df.fillna({'city':'Unknown','state':'Unknown','country':'Unknown'})

In [29]:
df = df.withColumn('registration_year',year(col('registration_date')))\
        .withColumn('registration_month',month(col('registration_date')))

In [30]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|             2023|                 6|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|             2023|                12|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|             2023|                10|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|             2023|                10|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|             2023|                 3|
+-----------+----------+---------+------

In [32]:
unique_cities = df.select(count_distinct('city')).collect()
unique_cities[0][0]

8

In [33]:
unique_states = df.select(count_distinct('state')).collect()
unique_states[0][0]

7

In [34]:
unique_country = df.select(count_distinct('country')).collect()
unique_country[0][0]

1

In [35]:
df.groupby('city').count().orderBy(col('count').desc()).show()

+---------+-----+
|     city|count|
+---------+-----+
|     Pune| 2243|
|Hyderabad| 2242|
|  Kolkata| 2223|
|Bangalore| 2211|
|    Delhi| 2200|
|Ahmedabad| 2198|
|  Chennai| 2194|
|   Mumbai| 2142|
+---------+-----+



In [36]:
df.groupBy('state').pivot('is_active').count().show()

+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|    Gujarat| 1211|1332|
|      Delhi| 1356|1222|
|  Karnataka| 1207|1276|
|  Telangana| 1294|1226|
|Maharashtra| 1260|1230|
| Tamil Nadu| 1284|1252|
|West Bengal| 1306|1197|
+-----------+-----+----+



In [38]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy('state').orderBy(col('registration_date').desc())

df = df.withColumn('rank',rank().over(window_spec))\
        .withColumn('dense_rank',dense_rank().over(window_spec))\
        .withColumn('row_num',row_number().over(window_spec))

In [39]:
df.show()

[Stage 62:>                                                         (0 + 1) / 1]

+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+-------+
|customer_id|          name|     city|state|country|registration_date|is_active|registration_year|registration_month|rank|dense_rank|row_num|
+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+-------+
|         61|   Customer_61|Hyderabad|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|      1|
|        501|  Customer_501|   Mumbai|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|      2|
|       2763| Customer_2763|     Pune|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|         1|      3|
|      12858|Customer_12858|Ahmedabad|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|         1|      4|
|     

                                                                                

In [40]:
df_recent_cust = df.filter(col('registration_date') >= lit('2023-07-01'))
df_recent_cust.count()

9025

In [42]:
#oldest and newest customer per city
 
df.groupBy('city').agg(min('registration_date').alias('oldest'),max('registration_date').alias('newest')).show()

+---------+----------+----------+
|     city|    oldest|    newest|
+---------+----------+----------+
|    Delhi|2023-01-01|2023-12-31|
|  Kolkata|2023-01-01|2023-12-31|
|Hyderabad|2023-01-01|2023-12-31|
|Bangalore|2023-01-01|2023-12-31|
|Ahmedabad|2023-01-01|2023-12-31|
|  Chennai|2023-01-01|2023-12-31|
|   Mumbai|2023-01-01|2023-12-31|
|     Pune|2023-01-01|2023-12-31|
+---------+----------+----------+



In [43]:
output_path = "gs://anshk-data/Data/tables/processed_cust"
df.write.mode('overwrite').parquet(output_path)

                                                                                

In [50]:
!hadoop fs -ls gs://anshk-data/Data/tables/processed_cust

2025-12-31 19:59:19,603 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-12-31 19:59:19,718 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-12-31 19:59:19,718 INFO impl.MetricsSystemImpl: google-hadoop-file-system metrics system started
Found 2 items
-rwx------   3 root root          0 2025-12-31 19:56 gs://anshk-data/Data/tables/processed_cust/_SUCCESS
-rwx------   3 root root     266407 2025-12-31 19:56 gs://anshk-data/Data/tables/processed_cust/part-00000-dbcd5074-70c6-4361-a467-42de58963469-c000.snappy.parquet


In [51]:
df1 = spark.read.parquet("gs://anshk-data/Data/tables/processed_cust")
df1.show()

+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+-------+
|customer_id|          name|     city|state|country|registration_date|is_active|registration_year|registration_month|rank|dense_rank|row_num|
+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+-------+
|         61|   Customer_61|Hyderabad|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|      1|
|        501|  Customer_501|   Mumbai|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|      2|
|       2763| Customer_2763|     Pune|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|         1|      3|
|      12858|Customer_12858|Ahmedabad|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|         1|      4|
|     

In [52]:
df1.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- registration_year: integer (nullable = true)
 |-- registration_month: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- dense_rank: integer (nullable = true)
 |-- row_num: integer (nullable = true)



#Joining Orders and Analyzing Customers and Orders

In [55]:
order_df = spark.read\
.format('csv')\
.option('header','true')\
.option('inferSchema','true')\
.load('gs://anshk-data/Data/1MB/orders.csv')

                                                                                

In [56]:
order_df.show(5)

+--------+-----------+----------+-----------------+---------+
|order_id|customer_id|order_date|     total_amount|   status|
+--------+-----------+----------+-----------------+---------+
|       0|       3692|2024-09-03|547.7160076008001|  Shipped|
|       1|      11055|2024-08-10|577.8942599188381|  Pending|
|       2|       6963|2024-08-22|484.2085562764487|  Pending|
|       3|      13268|2024-09-01|366.3286882431848|Cancelled|
|       4|       1131|2024-08-09|896.9588380686909|  Pending|
+--------+-----------+----------+-----------------+---------+
only showing top 5 rows



In [57]:
order_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- status: string (nullable = true)



In [63]:
order_df.groupBy('status').agg(sum('total_amount').alias('Total_by_status')).show()

+---------+------------------+
|   status|   Total_by_status|
+---------+------------------+
|Cancelled|2237958.0091099176|
|Delivered|2210383.5320680086|
|  Shipped|  2188888.99656208|
|  Pending| 2246861.628608617|
+---------+------------------+



In [66]:
order_df.groupBy('customer_id').agg(max('total_amount').alias('Max_Billed')).show()

+-----------+------------------+
|customer_id|        Max_Billed|
+-----------+------------------+
|       6357| 465.2976193252627|
|        496| 984.3695769319683|
|       7880|  904.659475141499|
|       4101|236.90680998816944|
|       8638|132.29910069841043|
|       1088| 950.3799854265135|
|      13289|  607.272646997907|
|       1342| 552.3527136029156|
|       1591| 488.4185135792018|
|      11317| 470.6559812194453|
|      16503|   944.58612042068|
|      12046| 910.0022337820485|
|        833| 348.1907005708173|
|        243| 515.3402408617783|
|       8928| 358.5235208931255|
|       3226| 762.4830099330148|
|        540| 657.2803234806577|
|       1395| 551.1891792344727|
|       3698| 736.2554247448688|
|       4190|  729.465679717145|
+-----------+------------------+
only showing top 20 rows



In [70]:
order_df.groupBy("customer_id") \
        .agg(count("order_id").alias("Total_Orders")) \
        .show()


+-----------+------------+
|customer_id|Total_Orders|
+-----------+------------+
|       6357|           2|
|        496|           3|
|       7880|           2|
|       4101|           1|
|       8638|           1|
|       1088|           2|
|      13289|           1|
|       1342|           2|
|       1591|           2|
|      11317|           1|
|      16503|           2|
|      12046|           1|
|        833|           1|
|        243|           1|
|       8928|           1|
|       3226|           2|
|        540|           1|
|       1395|           2|
|       3698|           2|
|       4190|           1|
+-----------+------------+
only showing top 20 rows

