# `Промышленное машинное обучение на Spark`
## `Занятие 08: Spark Structured Streaming`

![sparkStreaming](images/sparkStreaming.png)

О чём можно узнать из этого ноутбука:

* Обработка данных в онлайн-режиме
* Подключение источников
* Spark Streaming: Filters, Join, Window

Произведём все необходимые импорты и создадим Spark-context для дальнейшей работы

In [1]:
import pyspark.sql.functions as F

from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, DoubleType, IntegerType, LongType
from pyspark.sql import SparkSession

conf = (
    SparkConf()
        .set('spark.ui.port', '4050')
        .set('spark.driver.memory', '6g')
        .set('spark.executor.extraJavaOptions', '-Xss512m')
        .set('spark.driver.extraJavaOptions', '-Xss512m')
        .setMaster('local[*]')
        .setAppName("StructuredNetworkWordCount")
)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

23/12/02 07:40:11 WARN Utils: Your hostname, vm-01 resolves to a loopback address: 127.0.1.1; using 10.128.0.16 instead (on interface eth0)
23/12/02 07:40:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/02 07:40:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Далее создадим специальный датафрейм, в который будут приходить данные по сети. Этот датафрейм представляет собой абстракцию таблицы с бесконечным числом строк, в котором все пришедшие данные располагаются в колонке `value`. 

Вызов метода `readStream` указывает на то, что созданный датафрейм будет потокового типа, также при создании необходимо указать откуда будут идти данные. Источниками данных могут быть:
* Открытыте сетевые соединения
* Дириктории в локальной файловой системе или hdfs
* Из брокеров сообщений, например Kafka

In [18]:
words_lines = (spark
        .readStream
        .format('socket') # аргумент socket указывает на то, что данные будут приходить по сети
        .option('host', 'localhost') # указываем на каком хосте располагаются данные
        .option('port', '10000') # указываем на каком порте нужно вычитывать их
        .load()
)

23/12/02 07:48:58 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


Сделаем вывод схемы данных и типа созданного датафрейма

In [24]:
words_lines.printSchema()

f"Streaming DataFame: {words_lines.isStreaming}" 

root
 |-- value: string (nullable = true)



'Streaming DataFame: True'

Далее уже можно приступать непосредственно к описании требуемой логики обработки данных. В ячейке ниже происходит операция подсчёта пришедших слов, для этого разбиваем строки на списки слов и представляем каждое слово в виде отдельной строки в абстрактной таблице.

In [10]:
words = words_lines.select(
        F.explode( # преобразуем пришедшие данные в список и каждый элемент списка преобразуем в строку датафрейма
            F.split(words_lines.value, ' ') # атрибут .value - данные которые приходят на host:port
        ).alias('word')
    )

wordCounts = words.groupBy('word').count()

В предыдущих ячейках были сформировны два из трёх шагов: вычитка и процессеинг. Теперь сформируем последний шаг - запись. В нём указываем куда пишем данные - параметр метода format, и по какой логике формируется вывод - метод outputMode.

In [None]:
query = (wordCounts
        .writeStream
        .outputMode('complete')
        .format('console')
        .start()
)

Подобно тому как исполнение операций в batch-режиме производлось после вызова action операций, в SparkStreaming вычитка новых данных происходит после запуска метода awaitTermination, в котором производится старт фоновой задачи на обновление.

In [None]:
query.awaitTermination()

## Чтение данных из файловой системы

Попробуем считать данные из файловой системы, для этого рекомендуется заранее описать схему данных, а затем указать, какую директорию в файловой системе необходимо мониторить

In [2]:
# описываем схему данных таблиц из директории data
userSchema = StructType(
    [
        StructField("Date", StringType(), True),
        StructField("Open", DoubleType(), True),
        StructField("High", DoubleType(), True),
        StructField("Low", DoubleType(), True),
        StructField("Close", DoubleType(), True),
        StructField("Adjusted Close", DoubleType(), True),
        StructField("Volume", DoubleType(), True),
    ]
)
# Так как информация о названии компании содержится
# в названии файла, то добавим дополнительное поле в 
# датафрейм из названия файла
def get_company():
    filename = F.element_at(
        F.split(F.input_file_name(), "/"), -1
    )
    return F.element_at(F.split(filename, "_"), 1)

initDF = (spark
  .readStream
  .format("csv")
  .option("maxFilesPerTrigger", 2) # максимальное кол-во одновременно считываемых файлов
  .option("header", True)
  .option("path", "sourceDir") # путь к директории, которую необходимо отслеживать
  .schema(userSchema)
  .load()
  .withColumn("Name", get_company()) # добавим к колонкам ещё название компании
)

Задаём логику обработки данных. Производим агргацию данных об акциях компания, где будет указана самая высокая цена акции в некотором году.

In [30]:
stock_df = (
    initDF
    .groupBy(F.col("Name"), F.year(F.col("Date")).alias("Year"))
    .agg(F.max("High").alias("Max"))
)

Задаём логику записи данных в консоль.

In [31]:
query = (
  stock_df
  .writeStream
  .outputMode("update") # по какой логику делаем вывод - только обновления
  .option("truncate", False)
  .format("console") 
  .start()
)

23/12/02 00:17:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4b7d1092-3e3d-4470-9388-15f272a746ec. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/02 00:17:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [None]:
query.awaitTermination()

Можно записывать обработанные данные также файл и в файл, но тут есть ограничения на производимые операции. Так как запись в файл поддерживает только тип записи `append` - данные внутри файлов можно добавлять, но не изменять.

In [None]:
stock_df = (
    initDF
    .where(F.col("Close") - F.col("Open") > 0)
)

23/12/02 06:56:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/02 06:56:35 WARN StreamingQueryManager: Stopping existing streaming query [id=35be1d0b-031e-451b-bd48-0d58b19476c4, runId=382e6bed-abf5-4cf3-9dc1-0ae6b7216774], as a new run is being started.


In [None]:
stock_df\
  .writeStream\
  .outputMode("append")\
  .format("csv")\
  .option("path", "output")\
  .option("header", True)\
  .option("checkpointLocation", "checkpoints/")\
  .start()\
  .awaitTermination()


### Join & Windows

Потоковые данные можно джойнить между собой или же между привычными  batch-датафреймами.

In [3]:
companyFullNames = spark.createDataFrame(
    [
        ("AAPL", "Apple"),
        ("AMZN", "Amazon"),
        ("GOOGL","Google"),
        ("MSFT", "Microsoft"),
        ("CSCO", "CISCO")
    ], ["Name", "FullName"]
)

companyFullNames.show(2)

                                                                                                                                                                                     

+----+--------+
|Name|FullName|
+----+--------+
|AAPL|   Apple|
|AMZN|  Amazon|
+----+--------+
only showing top 2 rows



In [25]:
stock_df = (
    initDF
    .groupBy("Name", F.year("Date").alias("Year"))
    .agg(F.max("High").alias("Max"))
)

stock_df = stock_df.join(companyFullNames, on="Name", how="inner")

In [26]:
stock_df\
.writeStream\
.outputMode("complete")\
.trigger(processingTime='1 minute') \
.option("truncate", False)\
.option("numRows", 10)\
.format("console")\
.start()\
.awaitTermination()

23/12/02 08:24:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-5d4b566b-e312-429a-a4e3-2002f1f38d20. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/02 08:24:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                                                                                                                     

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+------+--------+
|Name|Year|Max   |FullName|
+----+----+------+--------+
|AAPL|2007|28.99 |Apple   |
|AAPL|2010|46.67 |Apple   |
|AAPL|2013|82.16 |Apple   |
|AAPL|2015|134.54|Apple   |
|AAPL|2016|118.69|Apple   |
|AAPL|2011|60.96 |Apple   |
|AAPL|2009|30.56 |Apple   |
|AAPL|2012|100.72|Apple   |
|AAPL|2008|28.61 |Apple   |
|AAPL|2006|13.31 |Apple   |
+----+----+------+--------+
only showing top 10 rows



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/evgeniy/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/evgeniy/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

Теперь попробуем создать оконную функцию, в которой будем обрабатывать данные, которые находятся в одном временном окне.

In [None]:
lines = (
    spark
    .readStream
    .format('socket')
    .option('host', 'localhost')
    .option('port', '10000')
    .option('includeTimestamp', 'true')
    .load()
)

words = lines.select(
    F.explode(F.split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

windowDuration = '1 minutes'
slideDuration = '30 seconds'

windowedCounts = words.groupBy(
    F.window(words.timestamp, windowDuration, slideDuration),
    words.word
).count().orderBy('window')

# Start running the query that prints the windowed word counts to the console
query = (windowedCounts
    .writeStream
    .outputMode('complete')
    .format('console')
    .option('truncate', 'false')
    .start()
)
         
query.awaitTermination()

23/12/02 08:30:13 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
23/12/02 08:30:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8aa4f919-d12b-44a1-9993-0d6135795916. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/02 08:30:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                                                                                                                     

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+



                                                                                                                                                                                     

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|world|1    |
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|hello|1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|hello|1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|world|1    |
+------------------------------------------+-----+-----+



                                                                                                                                                                                     

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|world|1    |
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|hello|1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|hello|2    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|boys |1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|world|1    |
|{2023-12-02 08:30:30, 2023-12-02 08:31:30}|hello|1    |
|{2023-12-02 08:30:30, 2023-12-02 08:31:30}|boys |1    |
+------------------------------------------+-----+-----+



                                                                                                                                                                                     

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+---------+-----+
|window                                    |word     |count|
+------------------------------------------+---------+-----+
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|world    |1    |
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|hello    |1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|hello    |2    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|boys     |1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|world    |1    |
|{2023-12-02 08:30:30, 2023-12-02 08:31:30}|hello    |1    |
|{2023-12-02 08:30:30, 2023-12-02 08:31:30}|boys     |1    |
|{2023-12-02 08:31:00, 2023-12-02 08:32:00}|words    |1    |
|{2023-12-02 08:31:00, 2023-12-02 08:32:00}|something|1    |
|{2023-12-02 08:31:00, 2023-12-02 08:32:00}|         |1    |
|{2023-12-02 08:31:30, 2023-12-02 08:32:30}|         |1    |
|{2023-12-02 08:31:30, 2023-12-02 08:32:30}|somet

                                                                                                                                                                                     

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+---------+-----+
|window                                    |word     |count|
+------------------------------------------+---------+-----+
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|world    |1    |
|{2023-12-02 08:29:30, 2023-12-02 08:30:30}|hello    |1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|hello    |2    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|boys     |1    |
|{2023-12-02 08:30:00, 2023-12-02 08:31:00}|world    |1    |
|{2023-12-02 08:30:30, 2023-12-02 08:31:30}|hello    |1    |
|{2023-12-02 08:30:30, 2023-12-02 08:31:30}|boys     |1    |
|{2023-12-02 08:31:00, 2023-12-02 08:32:00}|words    |1    |
|{2023-12-02 08:31:00, 2023-12-02 08:32:00}|something|1    |
|{2023-12-02 08:31:00, 2023-12-02 08:32:00}|         |1    |
|{2023-12-02 08:31:30, 2023-12-02 08:32:30}|         |1    |
|{2023-12-02 08:31:30, 2023-12-02 08:32:30}|somet