In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, window, when, col
from pyspark.sql.types import TimestampType

# Создание сессии Spark
spark = SparkSession.builder.appName("HomeWork_04").getOrCreate()

# Чтение данных из потокового источника 'rate'
df = spark.readStream.format("rate") \
    .option("rowsPerSecond", 1) \
    .option("numPartitions", 4) \
    .option("maxRecordsPerTrigger", 50) \
    .load()


# Преобразование типа данных столбца timestamp к типу TimestampType, если это необходимо
if df.schema["timestamp"].dataType != TimestampType():
    df = df.withColumn("timestamp", df["timestamp"].cast(TimestampType()))

# Определение водяного знака (watermark)
windowed_df = df.withWatermark("timestamp", "10 seconds")

# Определение интервала оконной функции
window_spec = window(windowed_df.timestamp, "10 seconds")

# Расчет суммы нечетных значений в окне
sum_df = windowed_df.groupBy(window_spec).agg(sum(when(col("value") % 2 != 0, col("value"))).alias("sum_odd"))

# Начать потоковый запрос
query = sum_df.writeStream.outputMode("append").format("memory").queryName("CountElements").start()

# Дождитесь завершения
query.awaitTermination(500)
query.stop()


In [7]:
spark.sql("SELECT * FROM CountElements").show(200)

+--------------------+-------+
|              window|sum_odd|
+--------------------+-------+
|{2024-05-02 22:27...|    515|
|{2024-05-02 22:27...|    565|
|{2024-05-02 22:26...|    265|
|{2024-05-02 22:26...|    315|
|{2024-05-02 22:25...|    115|
|{2024-05-02 22:25...|     65|
|{2024-05-02 22:26...|    215|
|{2024-05-02 22:25...|     16|
|{2024-05-02 22:26...|    415|
|{2024-05-02 22:26...|    365|
|{2024-05-02 22:27...|    615|
|{2024-05-02 22:26...|    465|
|{2024-05-02 22:25...|    165|
+--------------------+-------+

