In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, min, max, avg, unix_timestamp, count_if, round, when, collect_list, array_union, array_distinct, sum, sumDistinct, desc
from pyspark.sql.types import TimestampType, IntegerType, DoubleType

In [3]:
spark = SparkSession.builder.appName("MyGoitSparkSandbox").getOrCreate()

25/05/11 00:32:17 WARN Utils: Your hostname, MD-UASWD-0430 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/11 00:32:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/11 00:32:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Етап 1, завантаження датасетів

In [4]:
df_users = spark.read.csv('./users.csv', header=True)
df_products = spark.read.csv('./products.csv', header=True)
df_purchases = spark.read.csv('./purchases.csv', header=True)

In [5]:
df_users.show(10)

+-------+-------+---+------------------+
|user_id|   name|age|             email|
+-------+-------+---+------------------+
|      1| User_1| 45| user1@example.com|
|      2| User_2| 48| user2@example.com|
|      3| User_3| 36| user3@example.com|
|      4| User_4| 46| user4@example.com|
|      5| User_5| 29| user5@example.com|
|      6| User_6| 39| user6@example.com|
|      7| User_7| 24| user7@example.com|
|      8| User_8| 44| user8@example.com|
|      9| User_9| 27| user9@example.com|
|     10|User_10| 43|user10@example.com|
+-------+-------+---+------------------+
only showing top 10 rows



In [6]:
df_products.show(10)

+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|         1|   Product_1|     Beauty|  8.3|
|         2|   Product_2|       Home|  8.3|
|         3|   Product_3|Electronics|  9.2|
|         4|   Product_4|Electronics|  2.6|
|         5|   Product_5|Electronics|  9.4|
|         6|   Product_6|     Sports|  8.7|
|         7|   Product_7|     Beauty|  8.2|
|         8|   Product_8|     Sports|  1.0|
|         9|   Product_9|     Beauty|  6.0|
|        10|  Product_10|     Sports|  5.4|
+----------+------------+-----------+-----+
only showing top 10 rows



In [7]:
df_purchases.show(10)

+-----------+-------+----------+----------+--------+
|purchase_id|user_id|product_id|      date|quantity|
+-----------+-------+----------+----------+--------+
|          1|     52|         9|2022-01-01|       1|
|          2|     93|        37|2022-01-02|       8|
|          3|     15|        33|2022-01-03|       1|
|          4|     72|        42|2022-01-04|       9|
|          5|     61|        44|2022-01-05|       6|
|          6|     21|        24|2022-01-06|       7|
|          7|     83|        15|2022-01-07|       7|
|          8|     87|        32|2022-01-08|       3|
|          9|     75|        32|2022-01-09|       2|
|         10|     75|        24|2022-01-10|       9|
+-----------+-------+----------+----------+--------+
only showing top 10 rows



# Етап 2, очистка датасетів

In [8]:
df_users = df_users.dropna()
df_products = df_products.dropna()
df_purchases = df_purchases.dropna()

In [9]:
df_users.show(10)
df_products.show(10)
df_purchases.show(10)

+-------+-------+---+------------------+
|user_id|   name|age|             email|
+-------+-------+---+------------------+
|      1| User_1| 45| user1@example.com|
|      2| User_2| 48| user2@example.com|
|      3| User_3| 36| user3@example.com|
|      4| User_4| 46| user4@example.com|
|      5| User_5| 29| user5@example.com|
|      6| User_6| 39| user6@example.com|
|      7| User_7| 24| user7@example.com|
|      8| User_8| 44| user8@example.com|
|      9| User_9| 27| user9@example.com|
|     10|User_10| 43|user10@example.com|
+-------+-------+---+------------------+
only showing top 10 rows

+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|         1|   Product_1|     Beauty|  8.3|
|         2|   Product_2|       Home|  8.3|
|         3|   Product_3|Electronics|  9.2|
|         4|   Product_4|Electronics|  2.6|
|         5|   Product_5|Electronics|  9.4|
|         6|   Product_6|     Sports|  8.7|
|   

In [11]:
df_users.printSchema()
df_products.printSchema()
df_purchases.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- email: string (nullable = true)

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: string (nullable = true)

root
 |-- purchase_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- quantity: string (nullable = true)



# Етап 3, сума покупок за категоріями

In [18]:
df = df_purchases.join(df_products, df_purchases.product_id == df_products.product_id, 'inner').drop(df_products.product_id).select('user_id', 'category', 'price', 'quantity').withColumn("subtotal", col("price").cast(DoubleType()) * (col("quantity").cast(DoubleType()))).groupBy('category').agg(
    count("*").alias("total_"),
    sum("subtotal").alias("total_sum_")
).show()

+-----------+------+------------------+
|   category|total_|        total_sum_|
+-----------+------+------------------+
|       Home|    44|1523.4999999999998|
|     Sports|    63|1802.4999999999998|
|Electronics|    42|1174.7999999999997|
|   Clothing|    26|             790.3|
|     Beauty|    12| 459.8999999999999|
+-----------+------+------------------+



# Етап 4, сума покупок за категоріями для групи 18-25 років

In [19]:
df = (df_purchases.join(df_products, df_purchases.product_id == df_products.product_id, 'inner')
      .drop(df_products.product_id)
      .join(df_users, df_purchases.user_id == df_users.user_id, 'inner')
      .drop(df_users.user_id)
      .select('user_id', 'age', 'category', 'price', 'quantity')
      .withColumn("subtotal", col("price").cast(DoubleType()) * (col("quantity").cast(DoubleType())))
      .where(col("age").between(18, 25))
      .groupBy('category')
      .agg(
            count("*").alias("total_"),
            sum("subtotal").alias("total_sum_")
            )
      .show())

+-----------+------+------------------+
|   category|total_|        total_sum_|
+-----------+------+------------------+
|       Home|     7|             361.1|
|     Sports|    12|310.49999999999994|
|Electronics|     5|             249.6|
|   Clothing|     6|             245.0|
|     Beauty|     2|41.400000000000006|
+-----------+------+------------------+



# Етап 5, частка покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років.

In [21]:
df_total = (df_purchases.join(df_products, df_purchases.product_id == df_products.product_id, 'inner')
      .drop(df_products.product_id)
      .join(df_users, df_purchases.user_id == df_users.user_id, 'inner')
      .drop(df_users.user_id)
      .select('user_id', 'age', 'category', 'price', 'quantity')
      .withColumn("subtotal", col("price").cast(DoubleType()) * (col("quantity").cast(DoubleType())))
      .where(col("age").between(18, 25))
      .agg(sum("subtotal").alias("total_sum_"))).collect()[0][0]

In [22]:
df = (df_purchases.join(df_products, df_purchases.product_id == df_products.product_id, 'inner')
      .drop(df_products.product_id)
      .join(df_users, df_purchases.user_id == df_users.user_id, 'inner')
      .drop(df_users.user_id)
      .select('user_id', 'age', 'category', 'price', 'quantity')
      .withColumn("subtotal", col("price").cast(DoubleType()) * (col("quantity").cast(DoubleType())))
      .where(col("age").between(18, 25))
      .groupBy('category')
      .agg(
            count("*").alias("total_"),
            sum("subtotal").alias("total_sum_")
            )
      .withColumn("percent", round((col("total_sum_") / df_total) * 100, 2))
      .show())

+-----------+------+------------------+-------+
|   category|total_|        total_sum_|percent|
+-----------+------+------------------+-------+
|       Home|     7|             361.1|   29.9|
|     Sports|    12|310.49999999999994|  25.71|
|Electronics|     5|             249.6|  20.67|
|   Clothing|     6|             245.0|  20.29|
|     Beauty|     2|41.400000000000006|   3.43|
+-----------+------+------------------+-------+



# Етап 6, топ 3 категорії продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років

In [None]:
df_total = (df_purchases.join(df_products, df_purchases.product_id == df_products.product_id, 'inner')
      .drop(df_products.product_id)
      .join(df_users, df_purchases.user_id == df_users.user_id, 'inner')
      .drop(df_users.user_id)
      .select('user_id', 'age', 'category', 'price', 'quantity')
      .withColumn("subtotal", col("price").cast(DoubleType()) * (col("quantity").cast(DoubleType())))
      .where(col("age").between(18, 25))
      .agg(sum("subtotal").alias("total_sum_"))).collect()[0][0]

In [24]:
df = (df_purchases.join(df_products, df_purchases.product_id == df_products.product_id, 'inner')
      .drop(df_products.product_id)
      .join(df_users, df_purchases.user_id == df_users.user_id, 'inner')
      .drop(df_users.user_id)
      .select('user_id', 'age', 'category', 'price', 'quantity')
      .withColumn("subtotal", col("price").cast(DoubleType()) * (col("quantity").cast(DoubleType())))
      .where(col("age").between(18, 25))
      .groupBy('category')
      .agg(
            count("*").alias("total_"),
            sum("subtotal").alias("total_sum_")
            )
      .withColumn("percent", round((col("total_sum_") / df_total) * 100, 2))
      .orderBy(desc("percent"))
      .limit(3)
      .show())

+-----------+------+------------------+-------+
|   category|total_|        total_sum_|percent|
+-----------+------+------------------+-------+
|       Home|     7|             361.1|   29.9|
|     Sports|    12|310.49999999999994|  25.71|
|Electronics|     5|             249.6|  20.67|
+-----------+------+------------------+-------+



In [25]:
# Закриваємо сесію Spark
spark.stop()
