In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

**1. Завантаження та читання CSV-файлів як окремий DataFrame.**

In [5]:
# Створення Spark-сесії
spark = SparkSession.builder \
    .appName("Data Processing with Spark") \
    .getOrCreate()

user_data_path = '/content/drive/MyDrive/GoIT/DataEngineering/lesson3/users.csv'
purchases_data_path = '/content/drive/MyDrive/GoIT/DataEngineering/lesson3/purchases.csv'
products_data_path = '/content/drive/MyDrive/GoIT/DataEngineering/lesson3/products.csv'

# Завантаження даних
users_df = spark.read.csv(user_data_path, header=True, inferSchema=True)
purchases_df = spark.read.csv(purchases_data_path, header=True, inferSchema=True)
products_df = spark.read.csv(products_data_path, header=True, inferSchema=True)


In [8]:
users_df.show(10, truncate=False)

+-------+-------+---+------------------+
|user_id|name   |age|email             |
+-------+-------+---+------------------+
|1      |User_1 |45 |user1@example.com |
|2      |User_2 |48 |user2@example.com |
|3      |User_3 |36 |user3@example.com |
|4      |User_4 |46 |user4@example.com |
|5      |User_5 |29 |user5@example.com |
|6      |User_6 |39 |user6@example.com |
|7      |User_7 |24 |user7@example.com |
|8      |User_8 |44 |user8@example.com |
|9      |User_9 |27 |user9@example.com |
|10     |User_10|43 |user10@example.com|
+-------+-------+---+------------------+
only showing top 10 rows



In [9]:
purchases_df.show(10, truncate=False)

+-----------+-------+----------+----------+--------+
|purchase_id|user_id|product_id|date      |quantity|
+-----------+-------+----------+----------+--------+
|1          |52     |9         |2022-01-01|1       |
|2          |93     |37        |2022-01-02|8       |
|3          |15     |33        |2022-01-03|1       |
|4          |72     |42        |2022-01-04|9       |
|5          |61     |44        |2022-01-05|6       |
|6          |21     |24        |2022-01-06|7       |
|7          |83     |15        |2022-01-07|7       |
|8          |87     |32        |2022-01-08|3       |
|9          |75     |32        |2022-01-09|2       |
|10         |75     |24        |2022-01-10|9       |
+-----------+-------+----------+----------+--------+
only showing top 10 rows



In [10]:
products_df.show(10, truncate=False)

+----------+------------+-----------+-----+
|product_id|product_name|category   |price|
+----------+------------+-----------+-----+
|1         |Product_1   |Beauty     |8.3  |
|2         |Product_2   |Home       |8.3  |
|3         |Product_3   |Electronics|9.2  |
|4         |Product_4   |Electronics|2.6  |
|5         |Product_5   |Electronics|9.4  |
|6         |Product_6   |Sports     |8.7  |
|7         |Product_7   |Beauty     |8.2  |
|8         |Product_8   |Sports     |1.0  |
|9         |Product_9   |Beauty     |6.0  |
|10        |Product_10  |Sports     |5.4  |
+----------+------------+-----------+-----+
only showing top 10 rows



**2. Очистка даних**

In [16]:
users_df = users_df.dropna()
purchases_df = purchases_df.dropna()
products_df = products_df.dropna()

In [17]:
null_rows_count = users_df.filter(
    col("user_id").isNull() |
    col("name").isNull() |
    col("age").isNull() |
    col("email").isNull()
).count()

print(f"Кількість рядків із пропущеними значеннями: {null_rows_count}")

Кількість рядків із пропущеними значеннями: 0


In [18]:
# Фільтруємо рядки, де хоча б одне значення є null
null_rows_count_purchases = purchases_df.filter(
    col("purchase_id").isNull() |
    col("user_id").isNull() |
    col("product_id").isNull() |
    col("date").isNull() |
    col("quantity").isNull()
).count()

print(f"Кількість рядків із пропущеними значеннями у purchases_df: {null_rows_count_purchases}")


Кількість рядків із пропущеними значеннями у purchases_df: 0


In [19]:
# Фільтруємо рядки, де хоча б одне значення є null
null_rows_count_products = products_df.filter(
    col("product_id").isNull() |
    col("product_name").isNull() |
    col("category").isNull() |
    col("price").isNull()
).count()

print(f"Кількість рядків із пропущеними значеннями у products_df: {null_rows_count_products}")


Кількість рядків із пропущеними значеннями у products_df: 0


**3. Визначення загальної суми покупок за кожною категорією продуктів**

In [21]:
# Об'єднання purchases_df і products_df
purchases_with_products_df = purchases_df.join(products_df, "product_id")

# Обчислення суми покупок за категорією
category_total_df = purchases_with_products_df \
    .withColumn("total_cost", col("quantity") * col("price")) \
    .groupBy("category") \
    .agg(sum("total_cost").alias("total_sales"))

category_total_df.show()


+-----------+------------------+
|   category|       total_sales|
+-----------+------------------+
|       Home|1523.4999999999998|
|     Sports|1802.4999999999998|
|Electronics|1174.7999999999997|
|   Clothing|             790.3|
|     Beauty| 459.8999999999999|
+-----------+------------------+



**4. Визначення суми покупок за кожною категорією продуктів для вікової категорії від 18 до 25 включно**

In [22]:
# Фільтрація користувачів за віком
filtered_users_df = users_df.filter((col("age") >= 18) & (col("age") <= 25))

# Об'єднання з даними покупок
purchases_with_users_df = purchases_with_products_df \
    .join(filtered_users_df, "user_id")

# Сума покупок за категорією
age_filtered_category_total_df = purchases_with_users_df \
    .withColumn("total_cost", col("quantity") * col("price")) \
    .groupBy("category") \
    .agg(sum("total_cost").alias("total_sales_18_25"))

age_filtered_category_total_df.show()


+-----------+------------------+
|   category| total_sales_18_25|
+-----------+------------------+
|       Home|             361.1|
|     Sports|310.49999999999994|
|Electronics|             249.6|
|   Clothing|             245.0|
|     Beauty|41.400000000000006|
+-----------+------------------+



**5. Визначення частки покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років.**

In [23]:
# Загальна сума покупок для вікової категорії 18-25
total_sales_18_25 = age_filtered_category_total_df.agg(sum("total_sales_18_25")).collect()[0][0]

# Обчислення частки витрат
category_percentage_df = age_filtered_category_total_df \
    .withColumn("percentage", (col("total_sales_18_25") / total_sales_18_25) * 100)

category_percentage_df.show()


+-----------+------------------+------------------+
|   category| total_sales_18_25|        percentage|
+-----------+------------------+------------------+
|       Home|             361.1| 29.90228552500829|
|     Sports|310.49999999999994|25.712156343159986|
|Electronics|             249.6|20.669095727061944|
|   Clothing|             245.0| 20.28817489234846|
|     Beauty|41.400000000000006|3.4282875124213326|
+-----------+------------------+------------------+



**6. Вибер 3 категорій продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років.**

In [28]:
# Вибір топ-3 категорій за відсотком витрат
top_3_categories_df = category_percentage_df \
    .orderBy(col("percentage").desc()) \
    .limit(3)

top_3_categories_df.show()


+-----------+------------------+------------------+
|   category| total_sales_18_25|        percentage|
+-----------+------------------+------------------+
|       Home|             361.1| 29.90228552500829|
|     Sports|310.49999999999994|25.712156343159986|
|Electronics|             249.6|20.669095727061944|
+-----------+------------------+------------------+

