# Spark Apache (семинары)

## Урок 4. Изучение Spark Structure Streaming

### Задача:

- Условие: используйте источник rate, напишите код, который создаст дополнительный столбец, который будет выводить сумму только нечётных чисел.

In [1]:
import init_spark_env

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as fsum
import time

In [23]:
# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("StructuredStreamingOddNumbersSum") \
    .getOrCreate()

In [24]:
spark

In [25]:
# Создание потокового DataFrame, который получает данные из источника "rate"
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", "1") \
    .load()

In [26]:
# Фильтрация потока для получения только нечетных значений идентификаторов
odd_values_stream = rate_stream.filter(col("value") % 2 != 0)

In [27]:
# Группировка по временной метке и суммирование нечетных значений
odd_values_sum = odd_values_stream.groupBy().agg(fsum("value").alias("odd_sum"))

In [28]:
# Запуск потоковой обработки и вывод результатов в консоль
query = odd_values_sum \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Остановка потока после заданного времени (например, 30 секунд)
time_to_run = 30  # время в секундах
start_time = time.time()

while True:
    if (time.time() - start_time) > time_to_run:
        query.stop()
        print("Stream is stopped.")
        break
    time.sleep(1)  # Проверять условие каждую секунду

# Дожидаемся завершения обработки после остановки потока
query.awaitTermination()

24/10/05 12:49:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7ba2d4bc-fa98-426b-86a4-88bf6bc74877. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/10/05 12:49:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+
|odd_sum|
+-------+
|   NULL|
+-------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+
|odd_sum|
+-------+
|   NULL|
+-------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+
|odd_sum|
+-------+
|      1|
+-------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+
|odd_sum|
+-------+
|      1|
+-------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------+
|odd_sum|
+-------+
|      4|
+-------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+
|odd_sum|
+-------+
|      4|
+-------+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------+
|odd_

In [29]:
# Остановка SparkSession
spark.stop()

24/10/05 12:50:44 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.