In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when
import time

try:
    # Создаем сессию Spark
    spark = SparkSession.builder.appName("OddNumbersSum").getOrCreate()

    # Читаем поток данных из источника rate
    df = spark.readStream.format("rate").load()

    # Создаем новый столбец с нечетными числами
    df_with_odd_sum = df.withColumn("odd_value",
                                  when(col("value") % 2 == 1, col("value"))
                                  .otherwise(0))

    # Суммируем нечетные числа
    df_sum_odds = df_with_odd_sum.agg(sum("odd_value").alias("sum_of_odd_numbers"))

    # Создаем буфер для хранения последних результатов
    latest_results = []

    # Функция для сбора результатов
    def process_batch(df, batch_id):
        # Конвертируем DataFrame в список словарей
        results = [row.asDict() for row in df.collect()]
        # Сохраняем последний результат
        if results:
            latest_results.clear()
            latest_results.extend(results)
        print(f"Batch {batch_id}: {results}")

    # Запускаем поток, используя метод foreachBatch
    query = df_sum_odds.writeStream\
        .foreachBatch(process_batch)\
        .outputMode("update")\
        .start()

    # Ждем 10 секунд и собираем результаты
    start_time = time.time()
    while time.time() - start_time < 10:
        if latest_results:
            print(f"Текущая сумма нечетных чисел: {latest_results}")
        time.sleep(1)

    # Останавливаем запрос
    query.stop()

    print("Итоговый результат:")
    print(latest_results)
    print("Успешно выполнено!")

except Exception as e:
    print(f"Произошла ошибка: {e}")

Batch 0: [{'sum_of_odd_numbers': None}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': None}]
Batch 1: [{'sum_of_odd_numbers': 0}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 0}]
Batch 2: [{'sum_of_odd_numbers': 1}]
Batch 3: [{'sum_of_odd_numbers': 1}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 1}]
Batch 4: [{'sum_of_odd_numbers': 4}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 4}]
Batch 5: [{'sum_of_odd_numbers': 4}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 4}]
Batch 6: [{'sum_of_odd_numbers': 9}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 9}]
Batch 7: [{'sum_of_odd_numbers': 9}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 9}]
Batch 8: [{'sum_of_odd_numbers': 16}]
Текущая сумма нечетных чисел: [{'sum_of_odd_numbers': 16}]
Batch 9: [{'sum_of_odd_numbers': 16}]
Итоговый результат:
[{'sum_of_odd_numbers': 16}]
Успешно выполнено!
