<h2>Lab4: Об’єднання потоків в Spark Streaming, завдання 2</h2>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType, TimestampType   
import os

os.environ['SPARK_HOME'] = "/home/zaranik/.sdkman/candidates/spark/current"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python3'

Створення Spark сессії

In [None]:
spark = SparkSession.builder.appName("RunningCountExample").getOrCreate()
spark

Опис схеми даних

In [None]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("entity", StringType(), True),
    StructField("code", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("human_food", DoubleType(), True),
    StructField("animal_feed", DoubleType(), True),
    StructField("processed", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

Читання даних із JSON-потоків

In [None]:
df_from_stream_1 = spark.readStream.schema(schema).json("./data_1/")
df_from_stream_1 = df_from_stream_1.withColumn("timestamp", col("timestamp").cast(TimestampType()))

У вихідній структурі даних стовпець timestamp має тип StringType, і щоб Spark міг коректно інтерпретувати значення як тимчасові мітки (timestamp), він наводиться типу TimestampType.

Це необхідно для виконання операцій, пов'язаних з часом, наприклад, щоб встановити водяний знак (watermark) і уможливити join (об'єднання) з використанням інтервалу часу.

In [None]:
df_from_stream_2 = spark.readStream.schema(schema).json("./data_2/")
df_from_stream_2 = df_from_stream_2.withColumn("timestamp", col("timestamp").cast(TimestampType()))

df_from_stream_1 = df_from_stream_1.withWatermark("timestamp", "2 minutes")
df_from_stream_2 = df_from_stream_2.withWatermark("timestamp", "2 minutes")

Об’єднання двох DataFrame створених на основі потокових даних:

In [None]:
result_df = df_from_stream_1.alias("df_from_stream_1") \
    .join(df_from_stream_2.alias("df_from_stream_2"),
          (col("df_from_stream_1.id") == col("df_from_stream_2.id")) &
          (col("df_from_stream_1.timestamp").between(
              col("df_from_stream_2.timestamp") - expr("INTERVAL 1 MINUTE"),
              col("df_from_stream_2.timestamp") + expr("INTERVAL 1 MINUTE")
          ))
    )


Запис у файл

In [None]:
query = result_df.writeStream.outputMode("append") \
    .format("json") \
    .option("path", "./output_task2") \
    .option("checkpointLocation",  "./output_task2/checkpoints/") \
    .start()



Чекаємо на завершення процесу

In [None]:
query.awaitTermination()

Закриваємо сессію

In [None]:
spark.stop()