# **Spark Apache (Семинары)**
## **Урок 3. Инструменты работы и визуализации ч.2**
### Задание:
1. Условие: есть набор данных о продажах продуктов с информацией о дате продаж, категории продукта, количестве и выручке от продаж.

Используя Apache Spark, загрузите предоставленный набор данных в DataFrame (пример данных ниже).

("2023-11-20", "Electronics", 100, 12000),

("2023-11-21", "Electronics", 110, 13000),

("2023-11-22", "Electronics", 105, 12500),

("2023-11-20", "Clothing", 300, 15000),

("2023-11-21", "Clothing", 280, 14000),

("2023-11-22", "Clothing", 320, 16000),

("2023-11-20", "Books", 150, 9000),

("2023-11-21", "Books", 200, 12000),

("2023-11-22", "Books", 180, 10000)

Столбцы: "date", "category", "quantity", "revenue".

С использованием оконных функций, рассчитайте среднее выручки от продаж для каждой категории продукта.
Примените операцию pivot для того, чтобы преобразовать полученные данные таким образом, чтобы в качестве строк были категории продуктов, в качестве столбцов были дни, а значениями были средние значения выручки от продаж за соответствующий день

In [95]:
! pip install pyspark >> /dev/null

In [96]:
# Установка требуемых модулей
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [97]:
# Данные для задания
data = [ ("2023-11-20", "Electronics", 100, 12000),
("2023-11-21", "Electronics", 110, 13000),
("2023-11-22", "Electronics", 105, 12500),
("2023-11-20", "Clothing", 300, 15000),
("2023-11-21", "Clothing", 280, 14000),
("2023-11-22", "Clothing", 320, 16000),
("2023-11-20", "Books", 150, 9000),
("2023-11-21", "Books", 200, 12000),
("2023-11-22", "Books", 180, 10000)
]

In [98]:
# Установка сессии spark
spark = SparkSession.builder.appName('PivotWindowTest').getOrCreate()

# Создаем dataframe
df_spark = spark.createDataFrame(data=data, schema = ["date", "category", "quantity", "revenue"])

# Конвертируем строку в тип ДАТА
df_spark = df_spark.withColumn('date', F.to_date('date' ))


df_spark.show()

+----------+-----------+--------+-------+
|      date|   category|quantity|revenue|
+----------+-----------+--------+-------+
|2023-11-20|Electronics|     100|  12000|
|2023-11-21|Electronics|     110|  13000|
|2023-11-22|Electronics|     105|  12500|
|2023-11-20|   Clothing|     300|  15000|
|2023-11-21|   Clothing|     280|  14000|
|2023-11-22|   Clothing|     320|  16000|
|2023-11-20|      Books|     150|   9000|
|2023-11-21|      Books|     200|  12000|
|2023-11-22|      Books|     180|  10000|
+----------+-----------+--------+-------+



In [99]:
#  Определяем окно для расчетов
my_window = Window.partitionBy('category').orderBy('date')

# Расчитываем среднюю выручку от продажи одной единицы товара за один день в каждой категории товара
df_shrink = df_spark.withColumn("avg_revenue", F.round(F.avg(df_spark['revenue']/df_spark['quantity']).over(my_window), 2))

df_shrink.show()

+----------+-----------+--------+-------+-----------+
|      date|   category|quantity|revenue|avg_revenue|
+----------+-----------+--------+-------+-----------+
|2023-11-20|      Books|     150|   9000|       60.0|
|2023-11-21|      Books|     200|  12000|       60.0|
|2023-11-22|      Books|     180|  10000|      58.52|
|2023-11-20|   Clothing|     300|  15000|       50.0|
|2023-11-21|   Clothing|     280|  14000|       50.0|
|2023-11-22|   Clothing|     320|  16000|       50.0|
|2023-11-20|Electronics|     100|  12000|      120.0|
|2023-11-21|Electronics|     110|  13000|     119.09|
|2023-11-22|Electronics|     105|  12500|     119.08|
+----------+-----------+--------+-------+-----------+



In [100]:
# Расчитываем сводную таблицу по полученным данным.
# В качестве строк - категории товаров, в качестве столбцов - даты продажи,
#    в качестве значений - средняя выручка от продажи
#    соответсвующего товара в соответствующий день

df_shrink = df_shrink.groupBy('category').pivot('date').agg({'avg_revenue':'avg'})

df_shrink.show()

+-----------+----------+----------+----------+
|   category|2023-11-20|2023-11-21|2023-11-22|
+-----------+----------+----------+----------+
|      Books|      60.0|      60.0|     58.52|
|   Clothing|      50.0|      50.0|      50.0|
|Electronics|     120.0|    119.09|    119.08|
+-----------+----------+----------+----------+



In [101]:
# Сделаем все то же самое, но одной командой

df_shrink = df_spark.withColumn("avg_revenue", F.round(F.avg(df_spark['revenue']/df_spark['quantity']) \
                                .over(my_window), 2)) \
                    .groupBy('category') \
                    .pivot('date') \
                    .agg({'avg_revenue':'avg'})
df_shrink.show()

+-----------+----------+----------+----------+
|   category|2023-11-20|2023-11-21|2023-11-22|
+-----------+----------+----------+----------+
|      Books|      60.0|      60.0|     58.52|
|   Clothing|      50.0|      50.0|      50.0|
|Electronics|     120.0|    119.09|    119.08|
+-----------+----------+----------+----------+



In [102]:
spark.stop()