### Read and process data


In [0]:
spark

In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("spark_project").getOrCreate()



In [0]:
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load("/FileStore/tables/first_100_customers.csv")


In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows


In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



### PROCESSING DATA

In [0]:
from pyspark.sql.functions import *
df=df.withColumn("registration_date",to_timestamp("registration_date"))

In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)



In [0]:
df=df.withColumn("registration_date",to_date("registration_date"))

In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [0]:
df.fillna({'city':'Unknown','state':'Unknown'})

DataFrame[customer_id: int, name: string, city: string, state: string, country: string, registration_date: date, is_active: boolean]

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows


In [0]:
df=df.withColumn("registration_year",year(col("registration_date")))

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|             2023|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|             2023|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|             2023|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|             2023|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|             2023|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+
only showing top 5 rows


In [0]:
#unique cities
df.select(col("city")).distinct().show()

+---------+
|     city|
+---------+
|Bangalore|
|  Chennai|
|   Mumbai|
|Ahmedabad|
|  Kolkata|
|     Pune|
|    Delhi|
|Hyderabad|
+---------+



In [0]:
result=df.select(countDistinct("city")).collect()
result[0][0]

8

In [0]:
df.groupBy("city","state").count().orderBy(col("count").desc()).show(5)


+-------+-----------+-----+
|   city|      state|count|
+-------+-----------+-----+
|   Pune|  Telangana|    6|
|Kolkata|      Delhi|    5|
|   Pune| Tamil Nadu|    4|
|Kolkata|    Gujarat|    4|
|Chennai|Maharashtra|    4|
+-------+-----------+-----+
only showing top 5 rows


In [0]:
#In PySpark, a pivot table is used to transform rows into columns, just like in Excel or Pandas.It helps you summarize, group, and reshape your data for analysis 
df.groupBy('state').pivot('is_active').count().show()

+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|  Karnataka|   10|   6|
| Tamil Nadu|    4|  11|
|    Gujarat|    7|  11|
|      Delhi|    6|   4|
|  Telangana|    4|  12|
|Maharashtra|    8|   3|
|West Bengal|    7|   6|
+-----------+-----+----+



In [0]:
#window function
from pyspark.sql.window import Window
window_spec=Window.partitionBy('country').orderBy(col('registration_date').desc())

rank_result=df.withColumn('rank',rank().over(window_spec)).withColumn('dense_rank',dense_rank().over(window_spec)).withColumn('row_number',row_number().over(window_spec))

rank_result.select('name','city','state','rank','dense_rank','row_number').show(5)

+-----------+---------+-----------+----+----------+----------+
|       name|     city|      state|rank|dense_rank|row_number|
+-----------+---------+-----------+----+----------+----------+
|Customer_61|Hyderabad|      Delhi|   1|         1|         1|
|Customer_56|    Delhi|  Karnataka|   2|         2|         2|
| Customer_7|Ahmedabad|West Bengal|   3|         3|         3|
|Customer_71|   Mumbai|West Bengal|   4|         4|         4|
|Customer_39|     Pune| Tamil Nadu|   5|         5|         5|
+-----------+---------+-----------+----+----------+----------+
only showing top 5 rows


In [0]:
df_recent_customers=df.filter(col("registration_date")>lit('2023-07-11'))
df_recent_customers.count()

53

In [0]:
#oldest and newest cusotmer per city
result=df.groupBy('city').agg(min('registration_date').alias('oldest'),max('registration_date').alias('newest'))



### JOINING

In [0]:
orders=spark.read.format("csv").option("header","true").option("inferSchema","true").load("/FileStore/tables/orders.csv")

orders.show(5)

+--------+-----------+----------+-----------------+---------+
|order_id|customer_id|order_date|     total_amount|   status|
+--------+-----------+----------+-----------------+---------+
|       0|       3692|2024-09-03|547.7160076008001|  Shipped|
|       1|      11055|2024-08-10|577.8942599188381|  Pending|
|       2|       6963|2024-08-22|484.2085562764487|  Pending|
|       3|      13268|2024-09-01|366.3286882431848|Cancelled|
|       4|       1131|2024-08-09|896.9588380686909|  Pending|
+--------+-----------+----------+-----------------+---------+
only showing top 5 rows


In [0]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- status: string (nullable = true)



In [0]:
customers_orders_df=df.join(orders,'customer_id','inner')

In [0]:
customers_orders_df.count()

91

In [0]:
#Total ORder per Customer
customers_orders_count=customers_orders_df.groupby('customer_id').agg(sum('total_amount'))
customers_orders_count.show(5)

+-----------+-----------------+
|customer_id|sum(total_amount)|
+-----------+-----------------+
|         85|954.1298333153007|
|         53|972.4425845781869|
|         78|263.8754446578965|
|         34|876.3120853159711|
|         28|528.7985442876674|
+-----------+-----------------+
only showing top 5 rows


In [0]:
#order by status
customer_order_status=customers_orders_df.groupby('status').count()
customer_order_status.show()

+---------+-----+
|   status|count|
+---------+-----+
|  Shipped|   18|
|Cancelled|   30|
|Delivered|   19|
|  Pending|   24|
+---------+-----+



In [0]:
customers_orders_df=customers_orders_df.withColumn("order_month",month("order_date"))
customers_orders_df.display()

customer_id,name,city,state,country,registration_date,is_active,registration_year,order_id,order_date,total_amount,status,registration_month,order_month
68,Customer_68,Kolkata,Gujarat,India,2023-07-17,False,2023,10,2024-02-01,250.90701173471632,Pending,7,2
7,Customer_7,Ahmedabad,West Bengal,India,2023-12-28,True,2023,139,2024-07-10,546.6384666319628,Delivered,12,7
18,Customer_18,Pune,Delhi,India,2023-10-04,True,2023,258,2024-04-12,554.3177613815784,Cancelled,10,4
17,Customer_17,Hyderabad,West Bengal,India,2023-08-21,False,2023,295,2024-08-28,420.5654575766698,Cancelled,8,8
38,Customer_38,Bangalore,Telangana,India,2023-09-26,False,2023,415,2024-12-30,90.01453303201006,Cancelled,9,12
47,Customer_47,Kolkata,Telangana,India,2023-11-03,True,2023,418,2024-01-22,972.4383759704511,Pending,11,1
95,Customer_95,Mumbai,Karnataka,India,2023-09-29,True,2023,861,2024-11-03,694.9897335718198,Shipped,9,11
38,Customer_38,Bangalore,Telangana,India,2023-09-26,False,2023,930,2024-07-20,107.49581322437736,Shipped,9,7
26,Customer_26,Delhi,Delhi,India,2023-03-22,True,2023,990,2024-03-09,473.5525685203318,Pending,3,3
40,Customer_40,Pune,West Bengal,India,2023-01-15,False,2023,1273,2024-12-01,49.75964706636384,Cancelled,1,12


In [0]:
#ordersByMonth

orders_by_month=customers_orders_df.groupBy("order_month").count().orderBy(col('order_month').desc())
orders_by_month.show()

+-----------+-----+
|order_month|count|
+-----------+-----+
|         12|   11|
|         11|    7|
|         10|    4|
|          9|    7|
|          8|    7|
|          7|    6|
|          6|    7|
|          5|   11|
|          4|    7|
|          3|    6|
|          2|    9|
|          1|    9|
+-----------+-----+



In [0]:
#Find customers with high order frequency but low total spend
customer_spend_vs_orders=customers_order_count.