# завантаження даних і відкриття

In [1]:
import os

os.environ["PYTHONUNBUFFERED"] = "1"
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/4.0.0/libexec"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["SPARK_HOME"] + "/bin:" + os.environ["PATH"]

In [2]:
os.environ["PATH"]

'/opt/homebrew/opt/openjdk@17/bin:/opt/homebrew/Cellar/apache-spark/4.0.0/libexec/bin:/Users/aleksejkitajskij/Desktop/goit-de-hw-03/venv/bin:/Users/aleksejkitajskij/Desktop/goit_spark/venv/bin:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, min, max, avg, unix_timestamp, count_if, round, when
from pyspark.sql.types import TimestampType, IntegerType

In [4]:
# Створюємо сесію Spark
spark = SparkSession.builder.appName("MyGoitSparkSandbox").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/16 10:26:25 WARN Utils: Your hostname, Aleksejs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.226 instead (on interface en0)
25/08/16 10:26:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/16 10:26:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 1 Завантаження даних

In [5]:
# Завантажуємо датасет
users = spark.read.csv('data/users.csv', header=True)
purchases = spark.read.csv('data/purchases.csv', header=True)
products = spark.read.csv('data/products.csv', header=True)

In [6]:
users.show(3)
purchases.show(3)
products.show(3)

+-------+------+---+-----------------+
|user_id|  name|age|            email|
+-------+------+---+-----------------+
|      1|User_1| 45|user1@example.com|
|      2|User_2| 48|user2@example.com|
|      3|User_3| 36|user3@example.com|
+-------+------+---+-----------------+
only showing top 3 rows
+-----------+-------+----------+----------+--------+
|purchase_id|user_id|product_id|      date|quantity|
+-----------+-------+----------+----------+--------+
|          1|     52|         9|2022-01-01|       1|
|          2|     93|        37|2022-01-02|       8|
|          3|     15|        33|2022-01-03|       1|
+-----------+-------+----------+----------+--------+
only showing top 3 rows
+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|         1|   Product_1|     Beauty|  8.3|
|         2|   Product_2|       Home|  8.3|
|         3|   Product_3|Electronics|  9.2|
+----------+------------+-----------+-----+


In [7]:
users.createTempView("users")
purchases.createTempView("purchases")
products.createTempView("products")

# 2 Очистка даних

In [8]:
spark.sql("""
    CREATE OR REPLACE TEMP VIEW users_clean AS
    SELECT CAST(user_id AS STRING) AS user_id,
           name,
           CAST(age AS INT) AS age,
           email
    FROM users
    WHERE user_id IS NOT NULL
      AND name IS NOT NULL
      AND age IS NOT NULL
      AND email IS NOT NULL
""")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW purchases_clean AS
    SELECT CAST(purchase_id AS STRING) AS purchase_id,
           CAST(user_id AS STRING) AS user_id,
           CAST(product_id AS STRING) AS product_id,
           date,
           CAST(quantity AS INT) AS quantity
    FROM purchases
    WHERE purchase_id IS NOT NULL
      AND user_id IS NOT NULL
      AND product_id IS NOT NULL
      AND date IS NOT NULL
      AND quantity IS NOT NULL
""")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW products_clean AS
    SELECT CAST(product_id AS STRING) AS product_id,
           product_name,
           category,
           CAST(price AS DOUBLE) AS price
    FROM products
    WHERE product_id IS NOT NULL
      AND product_name IS NOT NULL
      AND category IS NOT NULL
      AND price IS NOT NULL
""")

DataFrame[]

# 3 Загальна сума покупок за категоріями

In [9]:
spark.sql("""
    SELECT pr.category,
           ROUND(SUM(pc.quantity * pr.price)) AS total_amount
    FROM purchases_clean pc
    JOIN products_clean pr ON pc.product_id = pr.product_id
    JOIN users_clean u ON pc.user_id = u.user_id
    GROUP BY pr.category
    ORDER BY total_amount DESC
""").show()

+-----------+------------+
|   category|total_amount|
+-----------+------------+
|     Sports|      1755.0|
|       Home|      1439.0|
|Electronics|      1142.0|
|   Clothing|       696.0|
|     Beauty|       442.0|
+-----------+------------+



# 4 Сума для віку 18–25

In [10]:
spark.sql("""
    CREATE OR REPLACE TEMP VIEW total_by_cat_18_25 AS
    SELECT pr.category,
           ROUND(SUM(pc.quantity * pr.price)) AS age_18_25_amount
    FROM purchases_clean pc
    JOIN products_clean pr ON pc.product_id = pr.product_id
    JOIN users_clean u ON pc.user_id = u.user_id
    WHERE u.age BETWEEN 18 AND 25
    GROUP BY pr.category
    ORDER BY age_18_25_amount DESC
""")

spark.sql("""
    SELECT *
    FROM total_by_cat_18_25
""").show()

+-----------+----------------+
|   category|age_18_25_amount|
+-----------+----------------+
|       Home|           361.0|
|     Sports|           310.0|
|Electronics|           250.0|
|   Clothing|           245.0|
|     Beauty|            41.0|
+-----------+----------------+



# 5 Частка від сумарних витрат 18–25

In [11]:
spark.sql("""
    CREATE OR REPLACE TEMP VIEW share_18_25 AS
    WITH total_sum AS (
        SELECT SUM(pc.quantity * pr.price) AS total_amount_18_25
        FROM purchases_clean pc
        JOIN products_clean pr ON pc.product_id = pr.product_id
        JOIN users_clean u ON pc.user_id = u.user_id
        WHERE u.age BETWEEN 18 AND 25
    )
    SELECT t.category,
           t.age_18_25_amount,
           ROUND(t.age_18_25_amount / ts.total_amount_18_25 * 100, 2) AS share_percent
    FROM total_by_cat_18_25 t
    CROSS JOIN total_sum ts
    ORDER BY share_percent DESC
""")

spark.sql("""
    SELECT *
    FROM share_18_25
""").show()

+-----------+----------------+-------------+
|   category|age_18_25_amount|share_percent|
+-----------+----------------+-------------+
|       Home|           361.0|        29.89|
|     Sports|           310.0|        25.67|
|Electronics|           250.0|         20.7|
|   Clothing|           245.0|        20.29|
|     Beauty|            41.0|          3.4|
+-----------+----------------+-------------+



# 6 Топ-3 категорії

In [12]:
spark.sql("""
    SELECT *
    FROM share_18_25
    ORDER BY share_percent DESC
    LIMIT 3
""").show()

+-----------+----------------+-------------+
|   category|age_18_25_amount|share_percent|
+-----------+----------------+-------------+
|       Home|           361.0|        29.89|
|     Sports|           310.0|        25.67|
|Electronics|           250.0|         20.7|
+-----------+----------------+-------------+

