## Spark Apache

### Инструменты работы и визуализации ч.2

Условие: есть набор данных о продажах продуктов с информацией о дате продаж, категории продукта, количестве и выручке от продаж.

Используя Apache Spark, загрузите предоставленный набор данных в DataFrame (пример данных ниже).

("2023-11-20", "Electronics", 100, 12000),
("2023-11-21", "Electronics", 110, 13000),
("2023-11-22", "Electronics", 105, 12500),
("2023-11-20", "Clothing", 300, 15000),
("2023-11-21", "Clothing", 280, 14000),
("2023-11-22", "Clothing", 320, 16000),
("2023-11-20", "Books", 150, 9000),
("2023-11-21", "Books", 200, 12000),
("2023-11-22", "Books", 180, 10000)

Столбцы: "date", "category", "quantity", "revenue".

С использованием оконных функций, рассчитайте среднее выручки от продаж для каждой категории продукта.
Примените операцию pivot для того, чтобы преобразовать полученные данные таким образом, чтобы в качестве строк были категории продуктов, в качестве столбцов были дни, а значениями были средние значения выручки от продаж за соответствующий день

In [3]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=62b7d7ca0576a2e6881064fec37a25253ee09dcfa0b5b6691a2129efa553a304
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
import pyspark.sql.functions as F

In [26]:
spark = SparkSession.builder.appName("WindowFunctionExample").getOrCreate()

# Создаем DF
data = [("2023-11-20", "Electronics", 100, 12000), ("2023-11-21", "Electronics", 110, 13000), ("2023-11-22", "Electronics", 105, 12500),
  ("2023-11-20", "Clothing", 300, 15000), ("2023-11-21", "Clothing", 280, 14000), ("2023-11-22", "Clothing", 320, 16000),
  ("2023-11-20", "Books", 150, 9000), ("2023-11-21", "Books", 200, 12000), ("2023-11-22", "Books", 180, 10000)]

df = spark.createDataFrame(data, ["date", "category", "quantity", "revenue"])

df.show()


+----------+-----------+--------+-------+
|      date|   category|quantity|revenue|
+----------+-----------+--------+-------+
|2023-11-20|Electronics|     100|  12000|
|2023-11-21|Electronics|     110|  13000|
|2023-11-22|Electronics|     105|  12500|
|2023-11-20|   Clothing|     300|  15000|
|2023-11-21|   Clothing|     280|  14000|
|2023-11-22|   Clothing|     320|  16000|
|2023-11-20|      Books|     150|   9000|
|2023-11-21|      Books|     200|  12000|
|2023-11-22|      Books|     180|  10000|
+----------+-----------+--------+-------+



In [31]:
# Оконная функция для нахождения среднего значения выручки по категориям
avg_revenue_window = Window.partitionBy("category", "date").orderBy("date")
# Вычисление среднего значения выручки по категории
avg_revenue = F.avg('revenue').over(avg_revenue_window)
# Применение оконной функции к DataFrame
result_df = df.withColumn("avg_revenue", avg_revenue)
result_df.show()

+----------+-----------+--------+-------+-----------+
|      date|   category|quantity|revenue|avg_revenue|
+----------+-----------+--------+-------+-----------+
|2023-11-20|      Books|     150|   9000|     9000.0|
|2023-11-21|      Books|     200|  12000|    12000.0|
|2023-11-22|      Books|     180|  10000|    10000.0|
|2023-11-20|   Clothing|     300|  15000|    15000.0|
|2023-11-21|   Clothing|     280|  14000|    14000.0|
|2023-11-22|   Clothing|     320|  16000|    16000.0|
|2023-11-20|Electronics|     100|  12000|    12000.0|
|2023-11-21|Electronics|     110|  13000|    13000.0|
|2023-11-22|Electronics|     105|  12500|    12500.0|
+----------+-----------+--------+-------+-----------+



In [32]:
# Выполняем операцию pivot, чтобы получить таблицу с категориями в строках и днями в столбцах
df_pivot = result_df .groupBy("category").pivot("date").agg(avg("avg_revenue"))
df_pivot.show()

+-----------+----------+----------+----------+
|   category|2023-11-20|2023-11-21|2023-11-22|
+-----------+----------+----------+----------+
|Electronics|   12000.0|   13000.0|   12500.0|
|   Clothing|   15000.0|   14000.0|   16000.0|
|      Books|    9000.0|   12000.0|   10000.0|
+-----------+----------+----------+----------+

