In [0]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName('Spark Mini Project')\
    .getOrCreate()

In [0]:
spark

In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/dheerajmaddi@gmail.com/customers_1mb.csv")

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    False|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     True|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     True|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    False|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    False|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [0]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- is_active: string (nullable = true)



In [0]:
from pyspark.sql.functions import to_date, col, year, month, countDistinct, count

In [0]:
df_updated_schema = df.withColumn('registration_date', to_date('registration_date', 'yyyy-MM-dd')) \
       .withColumn('is_active', col('is_active').cast('boolean'))

In [0]:
df_updated_schema.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [0]:
df_updated_schema.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [0]:
df.withColumn('registration_date', to_date('registration_date', 'yyyy-MM-dd')) \
       .withColumn('is_active', col('is_active').cast('boolean')).show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [0]:
df.rdd.getNumPartitions()

1

In [0]:
spark.sparkContext.defaultMinPartitions

2

In [0]:
spark.sparkContext.defaultParallelism

8

In [0]:
df_updated_schema = df_updated_schema.fillna({'city': 'Unknown', 'state': 'Unknown', 'country': 'Unknown'})

In [0]:
df_updated_schema = df_updated_schema.withColumn('registration_year', year('registration_date')) \
                                     .withColumn('registration_month', month('registration_date'))

In [0]:
df_updated_schema.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|             2023|                 6|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|             2023|                12|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|             2023|                10|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|             2023|                10|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|             2023|                 3|
+-----------+----------+---------+------

In [0]:
unique_cities = df_updated_schema.select('city').distinct().collect()
display(unique_cities)

city
Bangalore
Chennai
Mumbai
Ahmedabad
Kolkata
Pune
Delhi
Hyderabad


In [0]:
unique_cities

[Row(city='Bangalore'),
 Row(city='Chennai'),
 Row(city='Mumbai'),
 Row(city='Ahmedabad'),
 Row(city='Kolkata'),
 Row(city='Pune'),
 Row(city='Delhi'),
 Row(city='Hyderabad')]

In [0]:
unique_cities[0]

Row(city='Bangalore')

In [0]:
unique_cities[0][0]

'Bangalore'

In [0]:
unique_cities_count = df_updated_schema.select(countDistinct('city')).collect()
display(unique_cities_count)

count(DISTINCT city)
8


In [0]:
unique_cities_count

[Row(count(DISTINCT city)=8)]

In [0]:
unique_cities_count[0][0]

8

In [0]:
unique_states_count = df_updated_schema.select(countDistinct('state')).collect()
display(unique_states_count)

count(DISTINCT state)
7


In [0]:
unique_countries_count = df_updated_schema.select(countDistinct('country').alias('Total Countries')).collect()
display(unique_countries_count)

Total Countries
1


In [0]:
df_updated_schema.groupBy('city').count().orderBy(col('count').desc()).show(5)

+---------+-----+
|     city|count|
+---------+-----+
|     Pune| 2243|
|Hyderabad| 2242|
|  Kolkata| 2223|
|Bangalore| 2211|
|    Delhi| 2200|
+---------+-----+
only showing top 5 rows



In [0]:
df_updated_schema.groupBy('state', 'country').count().orderBy('count', ascending=False).show()

+-----------+-------+-----+
|      state|country|count|
+-----------+-------+-----+
|      Delhi|  India| 2578|
|    Gujarat|  India| 2543|
| Tamil Nadu|  India| 2536|
|  Telangana|  India| 2520|
|West Bengal|  India| 2503|
|Maharashtra|  India| 2490|
|  Karnataka|  India| 2483|
+-----------+-------+-----+



In [0]:
# Pivot Table - Count of Active and Inactive Users Per State 

df_updated_schema.groupBy('state').pivot('is_active').count().show()

+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|  Karnataka| 1207|1276|
| Tamil Nadu| 1284|1252|
|    Gujarat| 1211|1332|
|      Delhi| 1356|1222|
|  Telangana| 1294|1226|
|Maharashtra| 1260|1230|
|West Bengal| 1306|1197|
+-----------+-----+----+



In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import rank, dense_rank, row_number, lit, min, max

In [0]:
window_spec = Window.partitionBy('state').orderBy(col('registration_date').desc())

df_updated_schema = df_updated_schema.withColumn('rank', rank().over(window_spec))\
                    .withColumn('dense_rank', dense_rank().over(window_spec))\
                        .withColumn('row_number', row_number().over(window_spec))

In [0]:
df_updated_schema.show(10)

+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+----------+
|customer_id|          name|     city|state|country|registration_date|is_active|registration_year|registration_month|rank|dense_rank|row_number|
+-----------+--------------+---------+-----+-------+-----------------+---------+-----------------+------------------+----+----------+----------+
|         61|   Customer_61|Hyderabad|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|         1|
|        501|  Customer_501|   Mumbai|Delhi|  India|       2023-12-31|    false|             2023|                12|   1|         1|         2|
|       2763| Customer_2763|     Pune|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|         1|         3|
|      12858|Customer_12858|Ahmedabad|Delhi|  India|       2023-12-31|     true|             2023|                12|   1|        

In [0]:
df_recent_customers = df_updated_schema.filter(col('registration_date') >= lit('2023-07-01'))
df_recent_customers.orderBy('registration_date').show(5)
df_recent_customers.show(5)

+-----------+--------------+---------+-------+-------+-----------------+---------+-----------------+------------------+----+----------+----------+
|customer_id|          name|     city|  state|country|registration_date|is_active|registration_year|registration_month|rank|dense_rank|row_number|
+-----------+--------------+---------+-------+-------+-----------------+---------+-----------------+------------------+----+----------+----------+
|       5396| Customer_5396|  Kolkata|  Delhi|  India|       2023-07-01|    false|             2023|                 7|1322|       184|      1322|
|       3310| Customer_3310|  Kolkata|Gujarat|  India|       2023-07-01|    false|             2023|                 7|1297|       184|      1298|
|       7628| Customer_7628|Hyderabad|  Delhi|  India|       2023-07-01|     true|             2023|                 7|1322|       184|      1323|
|       8911| Customer_8911|  Kolkata|  Delhi|  India|       2023-07-01|    false|             2023|                 7

In [0]:
df_updated_schema.count()

17653

In [0]:
df_recent_customers.count()

9025

In [0]:
# Oldest and newest customer per city
df_updated_schema.groupBy('city').agg(min('registration_date').alias('oldest'), max('registration_date').alias('newest')).show()

+---------+----------+----------+
|     city|    oldest|    newest|
+---------+----------+----------+
|Bangalore|2023-01-01|2023-12-31|
|  Chennai|2023-01-01|2023-12-31|
|   Mumbai|2023-01-01|2023-12-31|
|Ahmedabad|2023-01-01|2023-12-31|
|  Kolkata|2023-01-01|2023-12-31|
|     Pune|2023-01-01|2023-12-31|
|    Delhi|2023-01-01|2023-12-31|
|Hyderabad|2023-01-01|2023-12-31|
+---------+----------+----------+



In [0]:
df_updated_schema.rdd.getNumPartitions()

1

In [0]:
output_path = '/FileStore/tables/processed_customers'
df_updated_schema.write.mode('overwrite').parquet(output_path)

In [0]:
df_processed_customers = spark.read.parquet('dbfs:/FileStore/tables/processed_customers')

In [0]:
df_processed_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- registration_year: integer (nullable = true)
 |-- registration_month: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- dense_rank: integer (nullable = true)
 |-- row_number: integer (nullable = true)

