Read and Process Data

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomerDataProcessing").getOrCreate()
spark

<pyspark.sql.connect.session.SparkSession at 0xfffe18d71b90>

In [0]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/Volumes/workspace/default/customers/customers.csv")

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows


In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [0]:
from pyspark.sql.functions import *

In [0]:
df = df.withColumn('registration_date', to_date(col('registration_date'), 'yyyy-MM-dd')).withColumn('is_active', col('is_active').cast('boolean'))

In [0]:
df.show(5)
df.printSchema()

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows
root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nu

In [0]:
df = df.fillna({'city': 'Unknown','state': 'Unknown', 'country': 'Unknown'})

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows


In [0]:
df = df.withColumn('registration_year', year(col('registration_date'))).withColumn('registration_month', month(col('registration_date')))

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|             2023|                 6|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|             2023|                12|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|             2023|                10|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|             2023|                10|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|             2023|                 3|
+-----------+----------+---------+------

In [0]:
unique_cities = df.select(countDistinct('city')).collect()
print(unique_cities)
print(unique_cities[0])
print('cities:',unique_cities[0][0])

unique_states = df.select(countDistinct('state')).collect()
print('states:',unique_states[0][0])

unique_countries = df.select(countDistinct('country')).collect()
print('countries:',unique_countries[0][0])

[Row(count(DISTINCT city)=8)]
Row(count(DISTINCT city)=8)
cities: 8
states: 7
countries: 1


In [0]:
df.groupby('city').count().orderBy(col('count').desc()).show()

+---------+-----+
|     city|count|
+---------+-----+
|     Pune| 2243|
|Hyderabad| 2242|
|  Kolkata| 2223|
|Bangalore| 2211|
|    Delhi| 2200|
|Ahmedabad| 2198|
|  Chennai| 2194|
|   Mumbai| 2142|
+---------+-----+



In [0]:
df.groupBy('state','country').count().orderBy(col('count').desc()).show()


+-----------+-------+-----+
|      state|country|count|
+-----------+-------+-----+
|      Delhi|  India| 2578|
|    Gujarat|  India| 2543|
| Tamil Nadu|  India| 2536|
|  Telangana|  India| 2520|
|West Bengal|  India| 2503|
|Maharashtra|  India| 2490|
|  Karnataka|  India| 2483|
+-----------+-------+-----+



In [0]:
# pivot Table - Count of active and inactive users per state
df.groupBy('state').pivot('is_active').count().show()

+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|      Delhi| 1356|1222|
|Maharashtra| 1260|1230|
|West Bengal| 1306|1197|
|  Telangana| 1294|1226|
|    Gujarat| 1211|1332|
|  Karnataka| 1207|1276|
| Tamil Nadu| 1284|1252|
+-----------+-----+----+



In [0]:
from pyspark.sql import Window

In [0]:
window_spec = Window.partitionBy('state').orderBy(col('registration_date').desc())

df = df.withColumn('rank', rank().over(window_spec)).withColumn('dense_rank', dense_rank().over(window_spec)).withColumn('row_number', row_number().over(window_spec))

In [0]:
df.show(5)

+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+----------+
|customer_id|          name|     city|state|country|registration_date|is_active|registration_year|registration_month|rank|dense_rank|row_number|
+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+----------+
|         61|   Customer_61|Hyderabad|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|         1|
|        501|  Customer_501|   Mumbai|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|         2|
|       2763| Customer_2763|     Pune|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|         1|         3|
|      12858|Customer_12858|Ahmedabad|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|        

In [0]:
df.select('name','city','state','rank','dense_rank','row_number').show(5)

+--------------+---------+-----+----+----------+----------+
|          name|     city|state|rank|dense_rank|row_number|
+--------------+---------+-----+----+----------+----------+
|   Customer_61|Hyderabad|Delhi|   1|         1|         1|
|  Customer_501|   Mumbai|Delhi|   1|         1|         2|
| Customer_2763|     Pune|Delhi|   1|         1|         3|
|Customer_12858|Ahmedabad|Delhi|   1|         1|         4|
|Customer_13570|Bangalore|Delhi|   1|         1|         5|
+--------------+---------+-----+----+----------+----------+
only showing top 5 rows


In [0]:
df_recent_customers = df.filter(col('registration_date') >= lit('2023-07-01'))
df_recent_customers.count()

9025

In [0]:
# oldest and newest customer per city

df.groupBy('city').agg(min('registration_date').alias('oldest'), max('registration_date').alias('newest')).show()

+---------+----------+----------+
|     city|    oldest|    newest|
+---------+----------+----------+
|  Kolkata|2023-01-01|2023-12-31|
|Bangalore|2023-01-01|2023-12-31|
|  Chennai|2023-01-01|2023-12-31|
|    Delhi|2023-01-01|2023-12-31|
|Ahmedabad|2023-01-01|2023-12-31|
|   Mumbai|2023-01-01|2023-12-31|
|     Pune|2023-01-01|2023-12-31|
|Hyderabad|2023-01-01|2023-12-31|
+---------+----------+----------+



In [0]:
output_path = '/Volumes/workspace/default/customers'
df.write.mode('overwrite').parquet(output_path)