In [0]:
dbutils.fs.ls('/FileStore/tables/')

Out[2]: [FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales.csv', name='BigMart_Sales.csv', size=869537, modificationTime=1749578148000),
 FileInfo(path='dbfs:/FileStore/tables/country_region.csv', name='country_region.csv', size=230, modificationTime=1753727513000),
 FileInfo(path='dbfs:/FileStore/tables/drivers.json', name='drivers.json', size=180812, modificationTime=1749579944000),
 FileInfo(path='dbfs:/FileStore/tables/ecommerce_transactions.csv', name='ecommerce_transactions.csv', size=794539, modificationTime=1753727513000),
 FileInfo(path='dbfs:/FileStore/tables/salaries.csv', name='salaries.csv', size=7486247, modificationTime=1752179357000)]

In [0]:
df=spark.read.option('header', 'true').option('inferSchema', 'true').csv('/FileStore/tables/country_region.csv')
df1=spark.read.option('header', 'true').option('inferSchema', 'true').csv('/FileStore/tables/ecommerce_transactions.csv')

In [0]:
df.display()


country,region
United States,North America
Canada,North America
United Kingdom,Europe
Germany,Europe
France,Europe
Australia,Asia‑Pacific
India,Asia‑Pacific
Japan,Asia‑Pacific
South Korea,Asia‑Pacific
Brazil,South America


In [0]:
df1.display()

order_id,customer_id,order_date,country,sku,category,quantity,unit_price,discount,payment_method
100000,7964,2024-08-26T10:31:59.000+0000,France,E00001,Electronics,8,563.95,0.03,Bank Transfer
100001,7621,2024-10-04T17:13:10.000+0000,Japan,T00001,Toys,5,70.31,0.11,Credit Card
100002,9339,2024-10-12T15:27:09.000+0000,France,C00001,Clothing,8,94.25,0.07,Credit Card
100003,8990,2024-01-24T20:35:56.000+0000,South Korea,S00001,Sports,3,304.44,0.11,Bank Transfer
100004,9735,2024-06-11T21:26:05.000+0000,India,E00002,Electronics,8,171.82,0.14,PayPal
100005,5918,2024-02-26T17:50:27.000+0000,India,E00003,Electronics,9,774.66,0.1,Bank Transfer
100006,4333,2024-01-28T11:16:09.000+0000,Japan,T00002,Toys,2,23.84,0.14,PayPal
100007,6079,2024-09-01T22:34:02.000+0000,France,C00002,Clothing,2,143.05,0.09,Gift Card
100008,8489,2024-06-07T19:18:51.000+0000,South Korea,B00001,Beauty,4,37.45,0.2,Bank Transfer
100009,8493,2024-03-13T19:19:03.000+0000,United States,E00004,Electronics,8,538.58,0.21,Gift Card


### removing duplicates and null values

In [0]:
df1_clean = df1.dropDuplicates().dropna(subset=["customer_id"]).filter("quantity > 0")


In [0]:
print(type(df1_clean))  # Should be <class 'pyspark.sql.dataframe.DataFrame'>


<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
from pyspark.sql.functions import col, to_timestamp, year, month, dayofmonth

df1_enriched = df1_clean.withColumn("order_value", col("quantity") * col("unit_price")) \
                        .withColumn("timestamp", to_timestamp("order_date")) \
                        .withColumn("year", year("timestamp")) \
                        .withColumn("month", month("timestamp")) \
                        .withColumn("day", dayofmonth("timestamp"))


In [0]:
print(type(df1_enriched))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
display(df1_enriched)

order_id,customer_id,order_date,country,sku,category,quantity,unit_price,discount,payment_method,order_value,timestamp,year,month,day
100400,2067,2024-05-25T22:29:21.000+0000,United Kingdom,H00047,Home,9,59.86,0.1,Credit Card,538.74,2024-05-25T22:29:21.000+0000,2024,5,25
100410,5697,2024-02-14T21:50:06.000+0000,Brazil,H00050,Home,8,22.32,0.13,Gift Card,178.56,2024-02-14T21:50:06.000+0000,2024,2,14
100569,2120,2024-06-04T05:36:57.000+0000,Brazil,T00080,Toys,5,11.23,0.22,Credit Card,56.150000000000006,2024-06-04T05:36:57.000+0000,2024,6,4
100665,7069,2024-11-17T12:51:19.000+0000,United States,H00089,Home,2,272.96,0.27,Credit Card,545.92,2024-11-17T12:51:19.000+0000,2024,11,17
101109,5820,2024-07-20T17:54:40.000+0000,Canada,E00167,Electronics,5,671.43,0.13,Gift Card,3357.15,2024-07-20T17:54:40.000+0000,2024,7,20
101118,7598,2024-10-06T19:18:46.000+0000,United Kingdom,B00159,Beauty,7,45.58,0.08,Gift Card,319.06,2024-10-06T19:18:46.000+0000,2024,10,6
101330,2552,2024-10-31T20:51:53.000+0000,United States,B00193,Books,9,30.22,0.1,Credit Card,271.98,2024-10-31T20:51:53.000+0000,2024,10,31
101389,1799,2024-01-12T17:21:07.000+0000,Brazil,H00218,Home,1,212.91,0.21,PayPal,212.91,2024-01-12T17:21:07.000+0000,2024,1,12
101428,3351,2024-10-17T19:00:34.000+0000,Germany,E00210,Electronics,2,208.94,0.25,Credit Card,417.88,2024-10-17T19:00:34.000+0000,2024,10,17
102024,2635,2024-03-26T23:23:50.000+0000,Germany,S00299,Sports,8,372.31,0.26,Bank Transfer,2978.48,2024-03-26T23:23:50.000+0000,2024,3,26


In [0]:
df_country_sales = df1_enriched.groupBy("country") \
                               .sum("order_value") \
                               .withColumnRenamed("sum(order_value)", "total_sales")
df_country_sales.show()


+--------------+-----------------+
|       country|      total_sales|
+--------------+-----------------+
|       Germany|780688.7099999993|
|        France|761008.1699999999|
| United States|769317.7899999997|
|         India|        781853.56|
|   South Korea|727383.3199999997|
|        Canada|761669.5700000004|
|        Brazil|742757.8399999997|
|         Japan| 749818.019999999|
|     Australia|746225.5499999996|
|United Kingdom| 809667.900000001|
+--------------+-----------------+



### top customer per country

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, sum as _sum

df_customer_sales = df1_enriched.groupBy("country", "customer_id") \
                                .agg(_sum("order_value").alias("total_spent"))

window_spec = Window.partitionBy("country").orderBy(col("total_spent").desc())

df_top_customer = df_customer_sales.withColumn("rank", rank().over(window_spec)) \
                                   .filter("rank = 1")
df_top_customer.show()


+--------------+-----------+------------------+----+
|       country|customer_id|       total_spent|rank|
+--------------+-----------+------------------+----+
|     Australia|       4935|           6414.82|   1|
|        Brazil|       9362|           7790.43|   1|
|        Canada|       2449|            7155.0|   1|
|        France|       6898|           7465.92|   1|
|       Germany|       7442|7315.5599999999995|   1|
|         India|       5918|           6971.94|   1|
|         Japan|       1351| 6720.210000000001|   1|
|   South Korea|       3262|           7199.55|   1|
|United Kingdom|       1529|           7012.17|   1|
| United States|       6675|          14769.66|   1|
+--------------+-----------+------------------+----+



### joined country region by sales of region

In [0]:
df_joined = df_country_sales.join(df, on="country", how="left")

df_region_sales = df_joined.groupBy("region") \
                           .sum("total_sales") \
                           .withColumnRenamed("sum(total_sales)", "region_sales")
df_region_sales.show()


+-------------+------------------+
|       region|      region_sales|
+-------------+------------------+
| Asia‑Pacific|3005280.4499999983|
|       Europe|2351364.7800000003|
|North America|        1530987.36|
|South America| 742757.8399999997|
+-------------+------------------+



### Monthly category pivot from ecommerce

In [0]:
df_monthly_category = df1_enriched.groupBy("month", "category") \
                                 .sum("order_value") \
                                 .withColumnRenamed("sum(order_value)", "monthly_sales")

df_pivot = df_monthly_category.groupBy("month").pivot("category").sum("monthly_sales")
df_pivot.display()


month,Beauty,Books,Clothing,Electronics,Home,Sports,Toys
12,34796.55,20699.02,68842.51999999997,250522.24,111541.43,114066.5,38558.46000000001
1,40178.51,18610.24999999999,61601.33,247822.49000000008,92580.05,136203.34999999992,46996.90000000004
6,30255.4,15733.850000000002,58377.19,258993.45999999996,81995.67,132812.77000000002,45847.04999999999
3,42021.95,20242.49,67911.83000000002,253859.5899999999,100413.82000000004,131340.1,52850.83000000001
5,38771.580000000016,19485.329999999994,63048.74999999999,273380.46,95268.15999999992,117601.92999999998,39134.12
9,44774.50999999998,18655.31,56257.08999999999,218244.48000000007,95938.42999999998,111247.26999999996,40093.4
4,34124.429999999986,17118.859999999997,55812.69,260356.79999999996,83010.54,119099.72999999997,41967.09
8,41736.4,20316.769999999982,78629.36999999998,281092.03,80167.68999999997,122236.08999999995,41056.34999999999
7,35991.490000000005,17184.17000000001,67164.97999999998,265243.23000000004,105589.05,103999.37,44140.19000000002
10,40892.12999999998,19798.17000000001,68531.21999999999,270149.11000000004,89646.30999999998,146181.42999999996,47798.94000000002


### Price band counts

In [0]:
from pyspark.sql.functions import when

df_bands = df1_enriched.withColumn("price_band", 
            when(col("order_value") < 50, "Low")
            .when((col("order_value") >= 50) & (col("order_value") < 200), "Medium")
            .otherwise("High"))

df_band_counts = df_bands.groupBy("price_band").count()
df_band_counts.show()


+----------+-----+
|price_band|count|
+----------+-----+
|      High| 6966|
|       Low|  692|
|    Medium| 2342|
+----------+-----+



In [0]:
df_partitions = df1_enriched.rdd.getNumPartitions()
print(f"Number of partitions: {df_partitions}")

df1_enriched = df1_enriched.repartition(8)

df_partitioned_count = df1_enriched.rdd.getNumPartitions()
print(f"Repartitioned to: {df_partitioned_count}")




Number of partitions: 8
Repartitioned to: 8
