In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, col, round
from pyspark.sql.types import IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("Data Processing").getOrCreate()

# Load the CSV files into separate DataFrames
purchases_df = spark.read.csv('./purchases.csv', header=True, inferSchema=True)
users_df = spark.read.csv('./users.csv', header=True, inferSchema=True)
products_df = spark.read.csv('./products.csv', header=True, inferSchema=True)

# Step 2: Clean the data by removing rows with any null values
purchases_df = purchases_df.dropna()
users_df = users_df.dropna()
products_df = products_df.dropna()

# Step 3: Calculate the total purchase amount per product category with rounded amount
merged_df = purchases_df.join(products_df, 'product_id')
# Calculate the total purchase amount (quantity * price) and round to 2 decimal places
merged_df = merged_df.withColumn('amount', round(col('quantity') * col('price'), 2))
# Group by category and sum the calculated amount, then round to 2 decimal places
total_purchase_per_category = merged_df.groupBy('category').agg(round(spark_sum('amount'), 2).alias('total_amount'))

# Step 4: Calculate the total purchase amount per product category for age group 18-25 with rounded amount
filtered_users_df = users_df.filter((col('age') >= 18) & (col('age') <= 25))
merged_age_df = purchases_df.join(filtered_users_df, 'user_id').join(products_df, 'product_id')
# Calculate the total purchase amount (quantity * price) and round to 2 decimal places for age group 18-25
merged_age_df = merged_age_df.withColumn('amount', round(col('quantity') * col('price'), 2))
total_purchase_18_25 = merged_age_df.groupBy('category').agg(round(spark_sum('amount'), 2).alias('total_amount_18_25'))

# Step 5: Calculate the percentage share of purchases for each category within age group 18-25
from pyspark.sql.window import Window
windowSpec = Window.partitionBy()
percentage_share = total_purchase_18_25.withColumn(
    'percentage', round((col('total_amount_18_25') / spark_sum('total_amount_18_25').over(windowSpec)) * 100, 2)
)

# Step 6: Select the top 3 categories by percentage of purchases for age group 18-25
top_3_categories = percentage_share.orderBy(col('percentage').desc()).limit(3)

# Show results
total_purchase_per_category.show()
total_purchase_18_25.show()
percentage_share.show()
top_3_categories.show()


+-----------+------------+
|   category|total_amount|
+-----------+------------+
|       Home|      1523.5|
|     Sports|      1802.5|
|Electronics|      1174.8|
|   Clothing|       790.3|
|     Beauty|       459.9|
+-----------+------------+

+-----------+------------------+
|   category|total_amount_18_25|
+-----------+------------------+
|       Home|             361.1|
|     Sports|             310.5|
|Electronics|             249.6|
|   Clothing|             245.0|
|     Beauty|              41.4|
+-----------+------------------+

+-----------+------------------+----------+
|   category|total_amount_18_25|percentage|
+-----------+------------------+----------+
|       Home|             361.1|      29.9|
|     Sports|             310.5|     25.71|
|Electronics|             249.6|     20.67|
|   Clothing|             245.0|     20.29|
|     Beauty|              41.4|      3.43|
+-----------+------------------+----------+

+-----------+------------------+----------+
|   category|tota

In [None]:
spark.stop()