# Изучение Spark Structure Streaming
Условие: используйте источник rate, напишите код, который создаст дополнительный столбец, который будет выводить сумму только нечётных чисел.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=92e7d04cc7586e4e9ed88d10c9c2c2994baaa4605d8ef9ca9ee17c04422ed89a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [28]:
# Импорт модулей
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import time

In [29]:
# Проверка на существование запущенной spark-сессии
# необязательно при использовании синтаксиса - 'getOrCreate()' при создании spark-сессии

if 'spark' in locals():
  spark.stop()

In [30]:
# Создание сессии Spark
spark = SparkSession.builder \
    .appName("SumOddNumbers") \
    .getOrCreate()

In [31]:
# Чтение данных из источника rate
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

In [32]:
# Определение функции для вычисления суммы нечётных чисел
def sum_odd_numbers(df):
    return df.withColumn('sum_of_odds', when(col('value') % 2 != 0, col('value')).otherwise(0))

In [33]:
# Применение функции к DataFrame
df_with_sum = sum_odd_numbers(df)

In [34]:
# Вывод результата в консоль
query = df_with_sum.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [27]:
# # Время ожидания в миллисекундах (например, 10 секунд)
# timeout = 10000

# try:
#     # Ожидание завершения потока в течение timeout миллисекунд
#     query.awaitTermination(timeout)
# except KeyboardInterrupt:
#     # Принудительная остановка в случае прерывания (например, через Ctrl+C)
#     print("Terminating the query due to interrupt.")
# finally:
#     # Остановка запроса и завершение сессии Spark
#     query.stop()
#     spark.stop()

# # Этот вариант НЕ РАБОТАЕТ в Google Colab

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Terminating the query due to interrupt.


In [35]:
# Вариант 2
# Использование цикла для периодической проверки состояния стрима и завершения при необходимости

# Время ожидания в секундах (например, 10 секунд)
timeout = 10
start_time = time.time()

try:
    while query.isActive:
        current_time = time.time()
        elapsed_time = current_time - start_time
        if elapsed_time > timeout:
            print("Timeout reached. Terminating the query.")
            query.stop()
            break
        time.sleep(1)
except KeyboardInterrupt:
    print("Terminating the query due to interrupt.")
finally:
    query.stop()
    spark.stop()

Timeout reached. Terminating the query.
