Условие: есть набор данных о продажах продуктов с информацией о дате продаж, категории продукта, количестве и выручке от продаж.

Используя Apache Spark, загрузите предоставленный набор данных в DataFrame (пример данных ниже).

("2023-11-20", "Electronics", 100, 12000),
("2023-11-21", "Electronics", 110, 13000),
("2023-11-22", "Electronics", 105, 12500),
("2023-11-20", "Clothing", 300, 15000),
("2023-11-21", "Clothing", 280, 14000),
("2023-11-22", "Clothing", 320, 16000),
("2023-11-20", "Books", 150, 9000),
("2023-11-21", "Books", 200, 12000),
("2023-11-22", "Books", 180, 10000)

Столбцы: "date", "category", "quantity", "revenue".

С использованием оконных функций, рассчитайте среднее выручки от продаж для каждой категории продукта.
Примените операцию pivot для того, чтобы преобразовать полученные данные таким образом, чтобы в качестве строк были категории продуктов, в качестве столбцов были дни, а значениями были средние значения выручки от продаж за соответствующий день

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1f4fe34f541c7d414b46341629a943a36969daf298750c8ee014cd1ee5d75efb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as fn
from pyspark.sql.types import DoubleType


spark = SparkSession.builder.appName("Sem4").getOrCreate()

data = [("2023-11-20", "Electronics", 100, 12000),
        ("2023-11-21", "Electronics", 110, 13000),
        ("2023-11-22", "Electronics", 105, 12500),
        ("2023-11-20", "Clothing", 300, 15000),
        ("2023-11-21", "Clothing", 280, 14000),
        ("2023-11-22", "Clothing", 320, 16000),
        ("2023-11-20", "Books", 150, 9000),
        ("2023-11-21", "Books", 200, 12000),
        ("2023-11-22", "Books", 180, 10000)]

schema = ["date", "category", "quantity", "revenue"]
df = spark.createDataFrame(data, schema)

df = df.withColumn("date", fn.to_date("date"))

window_spec = Window.partitionBy("category")

avg_revenue = fn.avg("revenue").over(window_spec)

result_df = df.withColumn("avg_revenue", avg_revenue.cast(DoubleType()))

result_df.select(["category", "avg_revenue"]).distinct().show()

+-----------+------------------+
|   category|       avg_revenue|
+-----------+------------------+
|      Books|10333.333333333334|
|   Clothing|           15000.0|
|Electronics|           12500.0|
+-----------+------------------+



In [3]:
pivot_df = df.groupBy("category").pivot("date").agg(fn.avg("revenue")).orderBy("category")

pivot_df.show()

+-----------+----------+----------+----------+
|   category|2023-11-20|2023-11-21|2023-11-22|
+-----------+----------+----------+----------+
|      Books|    9000.0|   12000.0|   10000.0|
|   Clothing|   15000.0|   14000.0|   16000.0|
|Electronics|   12000.0|   13000.0|   12500.0|
+-----------+----------+----------+----------+

