### Установка пакетов

In [1]:
!pip install pyspark



### Импорт библиотек

In [2]:
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, current_timestamp, from_unixtime, window, avg, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import random
import time
import urllib.request

### Создание каталога data для хранения данных

In [3]:
if not os.path.exists('data/input'):
  os.makedirs('data/input')

if not os.path.exists('data/output'):
  os.makedirs('data/output')

### Создание экземпляра SparkSession

In [4]:
spark = SparkSession\
  .builder \
  .appName('Homework_5_lab3') \
  .config('spark.sql.streaming.schemaInference', 'true') \
  .getOrCreate()

## Применение Sliding Window

### Задание 1. Подготовить подходящие данные или воспользоваться продемонстрированным на паре генератором

In [5]:
for chunk in range(random.randint(5, 20)):
  with open(f'data/input/{chunk}.json', 'w') as file:
    for _ in range(random.randint(10, 20)):
      file.write(json.dumps({
        'CreatedAt': int(time.time()) + 4 * chunk,
        'Price': random.randint(1000, 10_000) / 100,
        'Quantity': random.randint(100, 2000),
        'Type': random.choice([ 'type1', 'type2', 'type3' ])
      }))

      file.write('\n')

In [6]:
schema = StructType([
  StructField('CreatedAt', StringType()),
  StructField('Price', DoubleType()),
  StructField('Quantity', IntegerType()),
  StructField('Type', StringType())
])

### Задание 2. Открыть на основе исходных json файлов потоковый датафрейм

In [7]:
df = spark.readStream \
  .format('json') \
  .option('path', 'data/input') \
  .option('maxFilesPerTrigger', 1) \
  .schema(schema) \
  .load()

In [8]:
df.printSchema()

root
 |-- CreatedAt: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Type: string (nullable = true)



In [9]:
df_prepared = df.withColumn("CreatedAt", to_timestamp(from_unixtime(col("CreatedAt"))))

In [10]:
df_prepared.printSchema()

root
 |-- CreatedAt: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Type: string (nullable = true)



### Задание 3. Написать произвольный запрос агрегации, используя функцию tumbling window

In [11]:
df_aggregated = df_prepared \
  .groupBy(window(col('CreatedAt'), '10 seconds', '5 second')) \
  .agg(
    avg('Price').alias('price_avg'),
    max('Quantity').alias('quantity_max')
  )

In [12]:
df_result = df_aggregated.select("window.start", "window.end", "price_avg", "quantity_max")

In [13]:
df_result.printSchema()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- price_avg: double (nullable = true)
 |-- quantity_max: integer (nullable = true)



In [14]:
stream = df_result.writeStream \
  .format('memory') \
  .outputMode('update') \
  .option('queryName', 'query') \
  .option('checkpointLocation', 'data/check-dir') \
  .trigger(processingTime='5 seconds') \
  .start()

In [15]:
spark.sql("SELECT * FROM query").show()

+-------------------+-------------------+------------------+------------+
|              start|                end|         price_avg|quantity_max|
+-------------------+-------------------+------------------+------------+
|2024-12-27 13:17:40|2024-12-27 13:17:50|49.672999999999995|        1972|
|2024-12-27 13:17:40|2024-12-27 13:17:50| 51.38633333333333|        1972|
+-------------------+-------------------+------------------+------------+



### Задание 4. Сохранить получаемые каждые X секунд исходные файлы в json

In [16]:
currentBatch = 0

while not stream.awaitTermination(30):
  lastProgress = stream.lastProgress
  batchId = lastProgress['batchId']

  if lastProgress['numInputRows'] == 0:
    spark.sql("SELECT * FROM query").write.json(f"data/output/{batchId}.json")
    stream.stop()

In [None]:
#spark.stop()