**1. Імпорт бібілотек**

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, lit, round

**2. Завантаження та читання кожного CSV-файлу як окремого DataFrame**

In [18]:
# Створення SparkSession
spark = SparkSession.builder \
    .appName("Homework 1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

# Шляхи до CSV-файлів
users_file = '/mnt/f/Educations/goit-de-hw-03/users.csv'
purchases_file = '/mnt/f/Educations/goit-de-hw-03/purchases.csv'
products_file = '/mnt/f/Educations/goit-de-hw-03/products.csv'

# Завантаження даних у DataFrame
users_df = spark.read.csv(users_file, header=True, inferSchema=True)
purchases_df = spark.read.csv(purchases_file, header=True, inferSchema=True)
products_df = spark.read.csv(products_file, header=True, inferSchema=True)

# Відображення перших кількох рядків кожного DataFrame
print("Users DataFrame:")
users_df.show(5)

print("Purchases DataFrame:")
purchases_df.show(5)

print("Products DataFrame:")
products_df.show(5)

Users DataFrame:
+-------+------+---+-----------------+
|user_id|  name|age|            email|
+-------+------+---+-----------------+
|      1|User_1| 45|user1@example.com|
|      2|User_2| 48|user2@example.com|
|      3|User_3| 36|user3@example.com|
|      4|User_4| 46|user4@example.com|
|      5|User_5| 29|user5@example.com|
+-------+------+---+-----------------+
only showing top 5 rows

Purchases DataFrame:
+-----------+-------+----------+----------+--------+
|purchase_id|user_id|product_id|      date|quantity|
+-----------+-------+----------+----------+--------+
|          1|     52|         9|2022-01-01|       1|
|          2|     93|        37|2022-01-02|       8|
|          3|     15|        33|2022-01-03|       1|
|          4|     72|        42|2022-01-04|       9|
|          5|     61|        44|2022-01-05|       6|
+-----------+-------+----------+----------+--------+
only showing top 5 rows

Products DataFrame:
+----------+------------+-----------+-----+
|product_id|product_

**2. Очистка даних**

In [19]:
# Видалення рядків із пропущеними значеннями
users_df_cleaned = users_df.dropna()
purchases_df_cleaned = purchases_df.dropna()
products_df_cleaned = products_df.dropna()

# Виведення кількості рядків до і після очищення
print("Users DataFrame - кількість рядків до очищення:", users_df.count())
print("Users DataFrame - кількість рядків після очищення:", users_df_cleaned.count())

print("Purchases DataFrame - кількість рядків до очищення:", purchases_df.count())
print("Purchases DataFrame - кількість рядків після очищення:", purchases_df_cleaned.count())

print("Products DataFrame - кількість рядків до очищення:", products_df.count())
print("Products DataFrame - кількість рядків після очищення:", products_df_cleaned.count())

# Перевірка результатів
print("Users DataFrame (після очищення):")
users_df_cleaned.show(5)

print("Purchases DataFrame (після очищення):")
purchases_df_cleaned.show(5)

print("Products DataFrame (після очищення):")
products_df_cleaned.show(5)


Users DataFrame - кількість рядків до очищення: 100
Users DataFrame - кількість рядків після очищення: 95
Purchases DataFrame - кількість рядків до очищення: 200
Purchases DataFrame - кількість рядків після очищення: 195
Products DataFrame - кількість рядків до очищення: 50
Products DataFrame - кількість рядків після очищення: 47
Users DataFrame (після очищення):
+-------+------+---+-----------------+
|user_id|  name|age|            email|
+-------+------+---+-----------------+
|      1|User_1| 45|user1@example.com|
|      2|User_2| 48|user2@example.com|
|      3|User_3| 36|user3@example.com|
|      4|User_4| 46|user4@example.com|
|      5|User_5| 29|user5@example.com|
+-------+------+---+-----------------+
only showing top 5 rows

Purchases DataFrame (після очищення):
+-----------+-------+----------+----------+--------+
|purchase_id|user_id|product_id|      date|quantity|
+-----------+-------+----------+----------+--------+
|          1|     52|         9|2022-01-01|       1|
|       

**3. Визначення загальної суми покупок за кожною категорією продуктів.**

In [20]:
# Об'єднання покупки з продуктами
purchases_with_products = purchases_df_cleaned.join(
    products_df_cleaned, 
    purchases_df_cleaned["product_id"] == products_df_cleaned["product_id"], 
    "inner"
)

# Додавання стовпця для загальної вартості покупки
purchases_with_products = purchases_with_products.withColumn(
    "total_cost", 
    col("quantity") * col("price")
)

# Обчислення загальної суми покупок за категоріями
total_sum_by_category = purchases_with_products.groupBy("category").agg(
    round(sum("total_cost"), 2).alias("total_purchase_sum")
)

# Результат
total_sum_by_category.show()

+-----------+------------------+
|   category|total_purchase_sum|
+-----------+------------------+
|       Home|            1523.5|
|     Sports|            1802.5|
|Electronics|            1174.8|
|   Clothing|             790.3|
|     Beauty|             459.9|
+-----------+------------------+



**4. Визначення суми покупок за кожною категорією продуктів для вікової категорії від 18 до 25 включно**

In [21]:
# Об'єднання покупок із продуктами
purchases_with_products = purchases_df_cleaned.join(
    products_df_cleaned, "product_id", "inner"
)

# Об'єднання з даними користувачів
purchases_with_users = purchases_with_products.join(
    users_df_cleaned, "user_id", "inner"
)

# Фільтрація за віком від 18 до 25 років
filtered_data = purchases_with_users.filter(
    (col("age") >= 18) & (col("age") <= 25)
)

# Додавання стовпця для загальної вартості покупки
filtered_data = filtered_data.withColumn(
    "total_cost", 
    col("quantity") * col("price")
)

# Обчислення суми за категоріями
total_sum_by_category = filtered_data.groupBy("category").agg(
    round(sum("total_cost"), 2).alias("total_purchase_sum")
)

# Результат
total_sum_by_category.show()

+-----------+------------------+
|   category|total_purchase_sum|
+-----------+------------------+
|       Home|             361.1|
|     Sports|             310.5|
|Electronics|             249.6|
|   Clothing|             245.0|
|     Beauty|              41.4|
+-----------+------------------+



**5. Визначенн частки покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років**

In [22]:
# Загальна сума витрат для вікової категорії
total_spending = total_sum_by_category.select(
    sum("total_purchase_sum").alias("total_sum")
).collect()[0]["total_sum"]

# Обчислення частки покупок за кожною категорією
category_shares = total_sum_by_category.withColumn(
    "purchase_share",
    round((col("total_purchase_sum") / lit(total_spending)) * 100, 2)
)

# Результат
category_shares.show()

+-----------+------------------+--------------+
|   category|total_purchase_sum|purchase_share|
+-----------+------------------+--------------+
|       Home|             361.1|          29.9|
|     Sports|             310.5|         25.71|
|Electronics|             249.6|         20.67|
|   Clothing|             245.0|         20.29|
|     Beauty|              41.4|          3.43|
+-----------+------------------+--------------+



**6. Вибір 3-х категорій продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років**

In [23]:
# Округлення відсотків
category_shares = category_shares.withColumn(
    "purchase_share", round(col("purchase_share"), 2)
)

# Сортування за відсотками у порядку спадання
top_3_categories = category_shares.orderBy(
    col("purchase_share").desc()
).limit(3)

# Результат
top_3_categories.show()

+-----------+------------------+--------------+
|   category|total_purchase_sum|purchase_share|
+-----------+------------------+--------------+
|       Home|             361.1|          29.9|
|     Sports|             310.5|         25.71|
|Electronics|             249.6|         20.67|
+-----------+------------------+--------------+



**7. Закриття Spark-сесії**

In [24]:
# Закриття Spark-сесії
spark.stop()