# Імпортуємо бібліотеки 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, round

# Завантажуємо та прочитуємо кожен CSV-файл як окремий DataFrame

In [2]:
spark = SparkSession.builder.appName("MyGoitSparkSandbox").getOrCreate()

products_df = spark.read.csv('products.csv', header=True)
purchases_df = spark.read.csv('purchases.csv', header=True)
users_df = spark.read.csv('users.csv', header=True)

# Очищуємо дані, видаляючи будь-які рядки з пропущеними значеннями

In [3]:
products_df = products_df.dropna()
purchases_df = purchases_df.dropna()
users_df = users_df.dropna()

# Визначаємо загальну суму покупок за кожною категорією продуктів

In [None]:
alldata_tab_df = purchases_df.join(products_df, on='product_id')
alldata_tab_df = alldata_tab_df.withColumn('total', round(col('quantity') * col('price'),2))
# alldata_tab_df.show()
category_total_df = alldata_tab_df.groupBy('category').agg(round(sum('total'),2).alias('grand_total'))
category_total_df.show()

+-----------+-----------+
|   category|grand_total|
+-----------+-----------+
|       Home|     1523.5|
|     Sports|     1802.5|
|Electronics|     1174.8|
|   Clothing|      790.3|
|     Beauty|      459.9|
+-----------+-----------+



# Визначаємо суму покупок за кожною категорією продуктів для вікової категорії від 18 до 25 включно

In [15]:
all_users_purchase_tab_df = purchases_df.join(users_df, on='user_id')
all_users_purchase_tab_df = products_df.join(all_users_purchase_tab_df, on='product_id')
all_users_purchase_tab_df = all_users_purchase_tab_df.withColumn('total', round(col('quantity') * col('price'),2))
selected_users_purchase_tab_df = all_users_purchase_tab_df.filter((col('age') >= 18) & (col('age') <= 25))
selected_users_category_total_df = selected_users_purchase_tab_df.groupBy('category').agg(round(sum('total'),2).alias('grand_total'))
selected_users_category_total_df.show()

+-----------+-----------+
|   category|grand_total|
+-----------+-----------+
|       Home|      361.1|
|     Sports|      310.5|
|Electronics|      249.6|
|   Clothing|      245.0|
|     Beauty|       41.4|
+-----------+-----------+



# Визначаємо частку покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років

In [20]:
selected_users_all_categories_total = selected_users_purchase_tab_df.agg(sum('total')).collect()[0][0]
selected_users_percentage_df = selected_users_category_total_df \
    .withColumn('percentage', round(col('grand_total') * 100 / selected_users_all_categories_total, 1)) \
    .select('category', 'percentage')
selected_users_percentage_df.show()

+-----------+----------+
|   category|percentage|
+-----------+----------+
|       Home|      29.9|
|     Sports|      25.7|
|Electronics|      20.7|
|   Clothing|      20.3|
|     Beauty|       3.4|
+-----------+----------+



# Вибераємо 3 категорії продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років

In [21]:
selected_users_top_3_df = selected_users_percentage_df.orderBy(col('percentage').desc()).limit(3)
selected_users_top_3_df.show()

+-----------+----------+
|   category|percentage|
+-----------+----------+
|       Home|      29.9|
|     Sports|      25.7|
|Electronics|      20.7|
+-----------+----------+



In [22]:
spark.stop()