# Домашнее задание 4

*Условие*: используйте источник rate, напишите код, который создаст дополнительный столбец, который будет выводить сумму только нечётных чисел.

In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as _sum

In [17]:
# Создание SparkSession
spark = SparkSession.builder \
    .appName("StructuredStreamingExample") \
    .getOrCreate()

# Создание источника rate
rate_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

# Добавление нового столбца с суммой нечётных чисел
result_df = rate_df.selectExpr("timestamp", "value") \
    .withColumn("is_odd", when(col("value") % 2 != 0, col("value")).otherwise(0)) \
    .groupBy("timestamp") \
    .agg(_sum("is_odd").alias("odd_sum"))

# Запуск потока
query = result_df.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("odd_sum_table") \
    .start()

# Ограничиваем время работы в 30 секунд
query.awaitTermination(30)

# Получение результатов из памяти
results_df = spark.sql("SELECT * FROM odd_sum_table")
results_df.show()

# Завершение работы SparkSession
spark.stop()

+--------------------+-------+
|           timestamp|odd_sum|
+--------------------+-------+
|2024-11-06 13:26:...|      5|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|      3|
|2024-11-06 13:25:...|      1|
|2024-11-06 13:25:...|      0|
|2024-11-06 13:25:...|      0|
|2024-11-06 13:26:...|      7|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|      9|
|2024-11-06 13:26:...|     13|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|     11|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|      0|
|2024-11-06 13:26:...|     15|
+--------------------+-------+

