In [1]:
# Імпорт бібліотек
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, round
import matplotlib.pyplot as plt

# Ініціалізація сесії Spark
spark = SparkSession.builder.appName("ProductPurchaseAnalysis").getOrCreate()

# Перевірка ініціалізації сесії
if spark:
    print("Сесія Spark успішно створена.")
    
    # Виведення адреси веб-інтерфейсу Spark
    web_ui_url = spark.sparkContext.uiWebUrl
    if web_ui_url:
        print(f"Spark UI доступний за адресою: {web_ui_url}")
    else:
        print("Адреса Spark UI не доступна.")
else:
    print("Не вдалося створити сесію Spark.")

# 1. Завантаження CSV-файлів
users = spark.read.csv("K:/PowerBi/Go IT Magistr/12_Data Engineering/hw_1/users.csv", header=True, inferSchema=True)
purchases = spark.read.csv("K:/PowerBi/Go IT Magistr/12_Data Engineering/hw_1/purchases.csv", header=True, inferSchema=True)
products = spark.read.csv("K:/PowerBi/Go IT Magistr/12_Data Engineering/hw_1/products.csv", header=True, inferSchema=True)

# Виведення даних з кожного файлу
print("Users DataFrame:")
users.show(5)
print("Purchases DataFrame:")
purchases.show(5)
print("Products DataFrame:")
products.show(5)

# 2. Очищення даних від пропущених значень
users = users.dropna()
purchases = purchases.dropna()
products = products.dropna()

# Виведення очищених даних
print("Users DataFrame після очищення:")
users.show(5)
print("Purchases DataFrame після очищення:")
purchases.show(5)
print("Products DataFrame після очищення:")
products.show(5)

# 3. Об’єднання даних для аналізу
data = purchases.join(users, "user_id").join(products, "product_id")
data = data.withColumn("total_cost", col("quantity") * col("price"))

# 4. Загальна сума покупок за категоріями
total_by_category = data.groupBy("category").agg(spark_sum("total_cost").alias("total_spent"))
print("Загальна сума покупок за категоріями:")
total_by_category.show()

# 5. Сума покупок для вікової категорії від 18 до 25 років
age_filtered_data = data.filter((col("age") >= 18) & (col("age") <= 25))
total_by_category_18_25 = age_filtered_data.groupBy("category").agg(spark_sum("total_cost").alias("total_spent_18_25"))
print("Сума покупок за категоріями для вікової категорії 18-25:")
total_by_category_18_25.show()

# 6. Частка покупок за кожною категорією для вікової категорії 18-25 років
total_spent_18_25 = total_by_category_18_25.agg(spark_sum("total_spent_18_25").alias("overall_spent_18_25")).collect()[0]["overall_spent_18_25"]
percentages_18_25 = total_by_category_18_25.withColumn("percentage", round((col("total_spent_18_25") / total_spent_18_25) * 100, 2))
print("Частка покупок за кожною категорією для вікової категорії 18-25:")
percentages_18_25.show()

# 7. Топ 3 категорії продуктів з найвищими витратами для вікової групи 18-25 років
top_3_categories = percentages_18_25.orderBy(col("percentage").desc()).limit(3)
print("Топ 3 категорії продуктів з найвищими витратами споживачами віком 18-25 років:")
top_3_categories.show()

# Збереження скріншотів DataFrame як зображень
def save_df_as_image(df, file_name):
    df_pandas = df.toPandas()
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.axis('tight')
    ax.axis('off')
    table = ax.table(cellText=df_pandas.values, colLabels=df_pandas.columns, loc='center')
    # Вказуємо шлях до папки для збереження
    plt.savefig(f"K:/PowerBi/Go IT Magistr/12_Data Engineering/hw_1/{file_name}")
    plt.close()

# Збереження скріншотів даних
save_df_as_image(users.limit(5), "users_snapshot.png")
save_df_as_image(purchases.limit(5), "purchases_snapshot.png")
save_df_as_image(products.limit(5), "products_snapshot.png")
save_df_as_image(total_by_category, "total_by_category_snapshot.png")
save_df_as_image(total_by_category_18_25, "total_by_category_18_25_snapshot.png")
save_df_as_image(top_3_categories, "top_3_categories_snapshot.png")

# Завершення сесії Spark
spark.stop()


Сесія Spark успішно створена.
Spark UI доступний за адресою: http://host.docker.internal:4040
Users DataFrame:
+-------+------+---+-----------------+
|user_id|  name|age|            email|
+-------+------+---+-----------------+
|      1|User_1| 45|user1@example.com|
|      2|User_2| 48|user2@example.com|
|      3|User_3| 36|user3@example.com|
|      4|User_4| 46|user4@example.com|
|      5|User_5| 29|user5@example.com|
+-------+------+---+-----------------+
only showing top 5 rows

Purchases DataFrame:
+-----------+-------+----------+----------+--------+
|purchase_id|user_id|product_id|      date|quantity|
+-----------+-------+----------+----------+--------+
|          1|     52|         9|2022-01-01|       1|
|          2|     93|        37|2022-01-02|       8|
|          3|     15|        33|2022-01-03|       1|
|          4|     72|        42|2022-01-04|       9|
|          5|     61|        44|2022-01-05|       6|
+-----------+-------+----------+----------+--------+
only showing to