In [0]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("CustomerDataProcessing").getOrCreate()

spark

In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/hrithikwayal14@gmail.com/customers.csv")

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    False|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     True|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     True|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    False|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    False|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [0]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- is_active: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
df=df.withColumn('registration_date',to_date(col('registration_date'),'yyyy-mm-dd')) \
    .withColumn('is_active',col('is_active').cast('boolean'))

In [0]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [0]:
df=df.fillna({'city':'Unknown','state':'Unknown','country':'Unknown'})

In [0]:
df=df.withColumn('registration_year',year(col('registration_date'))) \
    .withColumn('registration_month',month(col('registration_date')))

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-01-29|    false|             2023|                 1|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-01-07|     true|             2023|                 1|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-01-27|     true|             2023|                 1|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-01-17|    false|             2023|                 1|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-01-14|    false|             2023|                 1|
+-----------+----------+---------+------

In [0]:
unique_cities=df1.select(countDistinct('city')).collect()
unique_cities[0][0]

unique_states=df1.select(countDistinct('state')).collect()
unique_states[0][0]

unique_countries=df1.select(countDistinct('country')).collect()
unique_countries[0][0]

print(unique_cities)

 

[Row(count(DISTINCT city)=8)]


In [0]:
df.groupBy('city').count().orderBy(col('count').desc()).show()

+---------+-----+
|     city|count|
+---------+-----+
|     Pune| 2243|
|Hyderabad| 2242|
|  Kolkata| 2223|
|Bangalore| 2211|
|    Delhi| 2200|
|Ahmedabad| 2198|
|  Chennai| 2194|
|   Mumbai| 2142|
+---------+-----+



In [0]:
df.groupBy('state','country').count().orderBy(col('count').desc()).show()

+-----------+-------+-----+
|      state|country|count|
+-----------+-------+-----+
|      Delhi|  India| 2578|
|    Gujarat|  India| 2543|
| Tamil Nadu|  India| 2536|
|  Telangana|  India| 2520|
|West Bengal|  India| 2503|
|Maharashtra|  India| 2490|
|  Karnataka|  India| 2483|
+-----------+-------+-----+



In [0]:
#Pivot table

df.groupBy('state').pivot('is_active').count().show()

+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|  Karnataka| 1207|1276|
| Tamil Nadu| 1284|1252|
|    Gujarat| 1211|1332|
|      Delhi| 1356|1222|
|  Telangana| 1294|1226|
|Maharashtra| 1260|1230|
|West Bengal| 1306|1197|
+-----------+-----+----+



In [0]:
# Count of active and inactive users per state
#
rdf=df.withColumn('is_active',when(col('is_active'),'active').otherwise('inactive'))
rdf.groupBy('state').pivot('is_active').count().show()

+-----------+------+--------+
|      state|active|inactive|
+-----------+------+--------+
|  Karnataka|  1276|    1207|
| Tamil Nadu|  1252|    1284|
|    Gujarat|  1332|    1211|
|      Delhi|  1222|    1356|
|Maharashtra|  1230|    1260|
|West Bengal|  1197|    1306|
|  Telangana|  1226|    1294|
+-----------+------+--------+



In [0]:
df.select(col('is_active')==True).count()

Out[21]: 17653

In [0]:
from pyspark.sql.window import Window

In [0]:
window_spec=Window.partitionBy('state').orderBy(col('registration_date').desc())

df=df.withColumn('Rank',rank().over(window_spec)) \
    .withColumn('Dense_Rank',dense_rank().over(window_spec)) \
    .withColumn('Row_Number',row_number().over(window_spec))

In [0]:
df.select('name','city','rank','dense_rank').show(5)

+------------+---------+----+----------+
|        name|     city|rank|dense_rank|
+------------+---------+----+----------+
| Customer_61|Hyderabad|   1|         1|
|Customer_194|Ahmedabad|   1|         1|
|Customer_202|     Pune|   1|         1|
|Customer_501|   Mumbai|   1|         1|
|Customer_846|Bangalore|   1|         1|
+------------+---------+----+----------+
only showing top 5 rows



In [0]:
df_recent_cus=df.filter(col('registration_date') >= lit('2023-01-30'))
df_recent_cus.count()

Out[28]: 921

In [0]:
# Get the oldest and newest customer per city

df.groupBy('city').agg(min('registration_date').alias('Oldest'),max('registration_date').alias('Newest')).show()

+---------+----------+----------+
|     city|    Oldest|    Newest|
+---------+----------+----------+
|Bangalore|2023-01-01|2023-01-31|
|  Chennai|2023-01-01|2023-01-31|
|   Mumbai|2023-01-01|2023-01-31|
|Ahmedabad|2023-01-01|2023-01-31|
|  Kolkata|2023-01-01|2023-01-31|
|     Pune|2023-01-01|2023-01-31|
|    Delhi|2023-01-01|2023-01-31|
|Hyderabad|2023-01-01|2023-01-31|
+---------+----------+----------+



In [0]:
%fs ls /FileStore/tables/processed_customer

In [0]:
path='/FileStore/tables/processed_customer'
df.write.mode('overwrite').parquet(path)

#Joining and Analyzing Customers and Orders

In [0]:
df1 = spark.read.format("csv").option("header", "true").option('inferSchema','true').load("dbfs:/FileStore/shared_uploads/hrithikwayal14@gmail.com/orders.csv")
df1.show(5)

+--------+-----------+----------+-----------------+---------+
|order_id|customer_id|order_date|     total_amount|   status|
+--------+-----------+----------+-----------------+---------+
|       0|       3692|2024-09-03|547.7160076008001|  Shipped|
|       1|      11055|2024-08-10|577.8942599188381|  Pending|
|       2|       6963|2024-08-22|484.2085562764487|  Pending|
|       3|      13268|2024-09-01|366.3286882431848|Cancelled|
|       4|       1131|2024-08-09|896.9588380686909|  Pending|
+--------+-----------+----------+-----------------+---------+
only showing top 5 rows



In [0]:
df1.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- status: string (nullable = true)



In [0]:
#
df1.groupBy('status').agg(sum('total_amount').alias('Amount')).show()

+---------+------------------+
|   status|            Amount|
+---------+------------------+
|  Shipped|  2188888.99656208|
|Cancelled|2237958.0091099176|
|Delivered|2210383.5320680086|
|  Pending| 2246861.628608617|
+---------+------------------+



In [0]:
cus_ord_df=df.join(df1,"customer_id",'inner')

In [0]:
cus_ord_df=cus_ord_df.drop('Rank','Dense_Rank','Row_Number')

In [0]:
cus_ord_df.display(5)

customer_id,name,city,state,country,registration_date,is_active,registration_year,registration_month,order_id,order_date,total_amount,status
2,Customer_2,Hyderabad,Gujarat,India,2023-01-27,True,2023,1,10691,2024-06-16,215.941740861192,Pending
2,Customer_2,Hyderabad,Gujarat,India,2023-01-27,True,2023,1,2859,2024-10-12,345.0157171933204,Cancelled
3,Customer_3,Bangalore,Karnataka,India,2023-01-17,False,2023,1,8728,2024-07-19,939.6745764247852,Cancelled
4,Customer_4,Ahmedabad,Karnataka,India,2023-01-14,False,2023,1,17286,2024-03-23,19.27962920593053,Cancelled
4,Customer_4,Ahmedabad,Karnataka,India,2023-01-14,False,2023,1,2228,2024-11-23,512.2135003327533,Delivered
6,Customer_6,Pune,Delhi,India,2023-01-29,False,2023,1,15577,2024-01-23,890.4308117188951,Pending
6,Customer_6,Pune,Delhi,India,2023-01-29,False,2023,1,4985,2024-02-08,588.0820373117768,Cancelled
7,Customer_7,Ahmedabad,West Bengal,India,2023-01-28,True,2023,1,139,2024-07-10,546.6384666319628,Delivered
8,Customer_8,Pune,Karnataka,India,2023-01-22,True,2023,1,9347,2024-07-04,655.1633814544429,Delivered
10,Customer_10,Pune,Gujarat,India,2023-01-05,True,2023,1,14646,2024-08-27,26.360585366312066,Cancelled


In [0]:
#Total order per customer
cus_ord_count=cus_ord_df.groupBy('customer_id').count().orderBy(col('count').desc())
cus_ord_count.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|      11776|    7|
|       4294|    6|
|       3336|    6|
|       3243|    6|
|      13034|    6|
+-----------+-----+
only showing top 5 rows



In [0]:
#Total Order spent
cus_total_spend=cus_ord_df.groupBy('customer_id').agg(sum('total_amount').alias('Amount_Spent')).orderBy(col('Amount_Spent').desc())
cus_total_spend.show(5)

+-----------+------------------+
|customer_id|      Amount_Spent|
+-----------+------------------+
|       3336| 4362.550733141537|
|       3884|  4187.99763145619|
|      16020|3967.2692112582276|
|      14372| 3961.787139557334|
|      14933|3828.5841072418348|
+-----------+------------------+
only showing top 5 rows



In [0]:
#Avg Spend
cus_ord_df.groupBy('customer_id').agg(avg('total_amount').alias('Avg_Spent')).orderBy(col('Avg_Spent').desc()).show(5)

+-----------+-----------------+
|customer_id|        Avg_Spent|
+-----------+-----------------+
|      11854| 999.864258397557|
|         46| 999.592553819927|
|      17590|999.5726342625253|
|      11587|999.5595016039513|
|       6816|999.4348902885968|
+-----------+-----------------+
only showing top 5 rows



In [0]:
#Failed orders count

cus_ord_count_failed=cus_ord_df.groupBy('status').count()
cus_ord_count_failed.show()

+---------+-----+
|   status|count|
+---------+-----+
|  Shipped| 4386|
|Cancelled| 4469|
|Delivered| 4341|
|  Pending| 4457|
+---------+-----+



In [0]:

cus_ord_df.groupBy('status').agg(sum('total_amount')).show()


+---------+------------------+
|   status| sum(total_amount)|
+---------+------------------+
|  Shipped|  2188888.99656208|
|Cancelled|2237958.0091099176|
|Delivered|2210383.5320680086|
|  Pending| 2246861.628608617|
+---------+------------------+



In [0]:
#Order by month

cus_ord_df.withColumn('Order_month',month(col('order_date'))) \
    .groupBy('Order_month').count().orderBy(col('Order_month')).show()

+-----------+-----+
|Order_month|count|
+-----------+-----+
|          1| 1499|
|          2| 1368|
|          3| 1539|
|          4| 1457|
|          5| 1518|
|          6| 1455|
|          7| 1517|
|          8| 1472|
|          9| 1446|
|         10| 1513|
|         11| 1426|
|         12| 1443|
+-----------+-----+



In [0]:
window_spec1=Window.orderBy(col('Amount_Spent').desc())
cus_rank=cus_total_spend.withColumn('Dense_Rank',dense_rank().over(window_spec1))
cus_rank.show(5)

+-----------+------------------+----------+
|customer_id|      Amount_Spent|Dense_Rank|
+-----------+------------------+----------+
|       3336| 4362.550733141537|         1|
|       3884|  4187.99763145619|         2|
|      16020|3967.2692112582276|         3|
|      14372| 3961.787139557334|         4|
|      14933|3828.5841072418348|         5|
+-----------+------------------+----------+
only showing top 5 rows



In [0]:
# Customer with High order frequency and low total spend
cus_total_spend.printSchema() , cus_ord_count.printSchema()




root
 |-- customer_id: string (nullable = true)
 |-- Amount_Spent: double (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- count: long (nullable = false)

Out[59]: (None, None)

In [0]:
cus_spend_vs_order=cus_ord_count.join(cus_total_spend,'customer_id','inner')\
    .orderBy(col('count').desc(),col('Amount_Spent'))

cus_spend_vs_order.show(5)



+-----------+-----+------------------+
|customer_id|count|      Amount_Spent|
+-----------+-----+------------------+
|      11776|    7|  3438.36692751212|
|       5160|    6| 1656.737343311546|
|       4294|    6| 1821.603928366352|
|       3243|    6|2860.1827303387754|
|      14838|    6| 2894.355602564058|
+-----------+-----+------------------+
only showing top 5 rows



In [0]:
output_path='/FileStore/tables/final_customers_orders'
cus_ord_df.write.mode('overwrite').parquet(output_path)

In [0]:
%fs rm -r /Filestore/