In [1]:
! hdfs dfs -rmr /data
! hdfs dfs -put db2.csv /data/db2/file
! hdfs dfs -put db.csv /data/db/file

mkdir: `/data': File exists
Found 1 items
-rw-r--r--   1 root supergroup         45 2022-12-07 12:56 /data/db


In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import from_json, to_json, col, struct
import time
import signal
from typing import Union

from tools_kafka import Kafka
from tools_pyspark_hdfs import Spark_HDFS as HDFS
from tools_pyspark import stop_all_streams, sink, read_stream_kafka, console_stream, console_clear, console_show


spark = SparkSession.builder.appName("my_spark").getOrCreate()

kf = Kafka()
hdfs = HDFS(spark)
checkpoint = 'checkpoints/duplicates_console_chk'
topic_name = 'lesson4' # название топика для кафки
kafka_server = kf.SERVERS
schema = StructType() \
    .add("column_1", StringType()) \
    .add("column_2", IntegerType())

#  удалим старые чекпойнты
hdfs.rm('/user/root')


if topic_name not in kf.ls():
    # создадим топик кафки
    kf.add(topic_name, retention=86400*30)

    # отправим данные из файла в топик кафки
    stream = spark \
        .readStream \
        .format("csv") \
        .option("header", True) \
        .option("maxFilesPerTrigger", 1) \
        .schema(schema) \
        .csv("/data/db") \
        .selectExpr("CAST(null AS STRING) as key", 
                    "CAST(to_json(struct(*)) AS STRING) as value") \
        .writeStream \
        .format("kafka") \
        .outputMode("append") \
        .option("kafka.bootstrap.servers", kf.SERVERS) \
        .option("topic", topic_name) \
        .option("checkpointLocation", "checkpoints/stream_read_write") \
        .start()

# посмотрим содержимое топика кафки
kf.get(topic_name)

22/12/08 10:45:04 WARN Utils: Your hostname, alex-pc resolves to a loopback address: 127.0.1.1; using 192.168.122.1 instead (on interface virbr0)


Ivy Default Cache set to: /root/.ivy2/cache

Не удалось удалить файл: /user/root
{"column_1":"a","column_2":2}
{"column_1":"b","column_2":4}
{"column_1":"c","column_2":8}
{"column_1":"a","column_2":2}
{"column_1":"b","column_2":4}
{"column_1":"c","column_2":8}
{"column_1":"b","column_2":4}


## Как изменяется размер чекпойнта со временем на примере работы функции консоли

In [36]:
# Сделаем поток из топика кафки
stream = read_stream_kafka(spark, kafka_server, topic_name, schema)
stream.printSchema()

# запускаем поток с выводом в консоль
stream = console_stream(stream)

# проверям как изменяется чекпойнт
for i in range(3):
    # смотрим как растёт чекпойнт
    hdfs.du('/user/root')
    # ждём работы вотермарка
    time.sleep(5)

# остановим поток
stop_all_streams(spark)

root
 |-- column_1: string (nullable = true)
 |-- column_2: integer (nullable = true)
 |-- offset: long (nullable = true)

Тип	размер		путь
dir	45.0 B		hdfs://localhost/user/root/console
Тип	размер		путь
dir	5.6 KiB		hdfs://localhost/user/root/console
Тип	размер		путь
dir	6.6 KiB		hdfs://localhost/user/root/console
Stopping stream: <pyspark.sql.streaming.StreamingQuery object at 0x7454b92d4af0>


## Что в консоли:

In [37]:
# проверяем содержимое консоли
console_show(spark)
# удаляем файловую консоль
console_clear(spark)

+--------+--------+------+
|column_1|column_2|offset|
+--------+--------+------+
|c       |8       |5     |
|a       |2       |0     |
|b       |4       |6     |
|b       |4       |4     |
|b       |4       |1     |
|a       |2       |3     |
|c       |8       |2     |
+--------+--------+------+

Файл успешно удалён: console
Не удалось удалить файл: console/checkpoint


## Используем окно, чтобы удалить дубликаты

In [3]:
# Сделаем поток из топика кафки
stream = read_stream_kafka(spark, kafka_server, topic_name, schema)

# Добавим колонку receive_time о времени получения данных, чтобы работал вотермарк.
# Используем метод withWatermark: параметр - назване колонки, второй параметр - 
# гарантированное время жизни информации о сообщении в чекпойнте.
# Одновременно с этим будем удалять дубликаты в колонках
# в drop_duplicates надо указывать колонку и время марки, иначе в чекпойнт будут попадать все данные.
stream = stream.withColumn("receive_time", F.current_timestamp()) \
                .withColumn("window_time", F.window(F.col("receive_time"), "2 minutes")) \
                .withWatermark("window_time", "2 minutes") \
                .drop_duplicates(['column_1', 'window_time'])

stream.printSchema()

# запускаем поток с выводом в консоль
stream = console_stream(stream)

for i in range(5):
    # ждём
    time.sleep(5)
    # проверяем содержимое консоли
    console_show(spark)
    
# останавливаем потоки
stop_all_streams(spark)
# удаляем файловую консоль
console_clear(spark)

root
 |-- column_1: string (nullable = true)
 |-- column_2: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- receive_time: timestamp (nullable = false)
 |-- window_time: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)

+--------+--------+------+------------------------+--------------------------------------------------------------------+
|column_1|column_2|offset|receive_time            |window_time                                                         |
+--------+--------+------+------------------------+--------------------------------------------------------------------+
|a       |2       |0     |2022-12-07T16:01:30.970Z|{start -> 2022-12-07T16:00:00.000Z, end -> 2022-12-07T16:02:00.000Z}|
+--------+--------+------+------------------------+--------------------------------------------------------------------+

+--------+--------+------+------------------------+----------------------------------------

## Сделаем группировку по sliding_time - не понятно, почему оно не работет!?

In [4]:
# Читаем поток из топика кафки
stream = read_stream_kafka(spark, kafka_server, topic_name, schema, maxOffsetsPerTrigger=10)

# сгруппируем по sliding_time
stream = stream.select("column_1", 'column_2') \
                .withColumn("receive_time", F.current_timestamp()) \
                .withColumn("sliding_time", F.window(F.col("receive_time"), "1 minute", "30 seconds")) \
                .withWatermark("sliding_time", "2 minutes") \
                .groupBy("sliding_time").count()

stream.printSchema()

# запускаем поток с выводом в консоль
stream = console_stream(stream, processingTime='0 seconds', mode='append')

# ждём
time.sleep(20)
# проверяем содержимое консоли
console_show(spark)

# удаляем файловую консоль
console_clear(spark)
# останавливаем потоки
stop_all_streams(spark)

root
 |-- sliding_time: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

+-----------------------------------+
|Нет данных для построения DataFrame|
+-----------------------------------+
|Было обработано файлов: 0          |
+-----------------------------------+

Файл успешно удалён: console
Stopping stream: <pyspark.sql.streaming.StreamingQuery object at 0x7979b84aecb0>


## Join датафрейма к стриму

In [3]:
# Создаём статичный фрейм, который будем джойнить к стриму
static_df_schema = StructType() \
    .add("column_1", StringType()) \
    .add("joined_value", StringType())
    
static_df_data = (
    ("a", 7),
    ("b", 9),
    ("c", 3),
)

static_df = spark.createDataFrame(static_df_data, static_df_schema)

# Читаем поток из топика кафки
stream = read_stream_kafka(spark, kafka_server, topic_name, schema, maxOffsetsPerTrigger=10)

# делаем джойн внутри окна
stream = stream.withColumn("receive_time", F.current_timestamp()) \
                .withColumn("window_time", F.window(F.col("receive_time"), "2 minutes")) \
                .withWatermark("window_time", "2 minutes") \
                .drop_duplicates(['column_1', 'window_time']) \
                .join(static_df, "column_1", "left")

stream.printSchema()

# запускаем поток с выводом в консоль
stream = console_stream(stream, processingTime='0 seconds', mode='append')

# ждём
time.sleep(20)
# проверяем содержимое консоли
console_show(spark)
    
# останавливаем потоки
stop_all_streams(spark)
# удаляем файловую консоль
console_clear(spark)

root
 |-- column_1: string (nullable = true)
 |-- column_2: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- receive_time: timestamp (nullable = false)
 |-- window_time: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- joined_value: string (nullable = true)

+--------+--------+------+------------------------+--------------------------------------------------------------------+------------+
|column_1|column_2|offset|receive_time            |window_time                                                         |joined_value|
+--------+--------+------+------------------------+--------------------------------------------------------------------+------------+
|c       |8       |2     |2022-12-08T10:10:39.096Z|{start -> 2022-12-08T10:10:00.000Z, end -> 2022-12-08T10:12:00.000Z}|3           |
|b       |4       |1     |2022-12-08T10:10:39.096Z|{start -> 2022-12-08T10:10:00.000Z, end -> 2022-12-08T10:12:00.000

## Join стрима к стриму

In [15]:
# сделаем два одинаковых по данным стрима: один из csv файла, 
# воторой из топика кафки (который был наполнен из того же csv Файла)

schema_join_file = StructType() \
    .add("column_1", StringType()) \
    .add("joined_value", IntegerType())

stream_from_file = spark \
    .readStream \
    .format("csv") \
    .option("header", True) \
    .option("maxFilesPerTrigger", 1) \
    .schema(schema_join_file) \
    .csv("/data/db2") \
    .withColumn("receive_time_file", F.current_timestamp()) \
    .withColumn("window_time",F.window(F.col("receive_time_file"), "2 minutes")) \
    .withWatermark("window_time", "2 minutes")

stream_from_kafka = read_stream_kafka(spark, kafka_server, topic_name, schema, maxOffsetsPerTrigger=10)

# делаем джойн внутри окна
stream_from_kafka = stream_from_kafka.withColumn("receive_time", F.current_timestamp()) \
                .withColumn("window_time", F.window(F.col("receive_time"), "2 minutes")) \
                .withWatermark("window_time", "2 minutes") \
                .drop_duplicates(['column_1', 'window_time']) \
                .join(stream_from_file, ["column_1", "window_time"] , "inner") \
                .select("column_1", "column_2",  "joined_value", "receive_time", "window_time")

print('Схема данных в стриме из файла:')
stream_from_file.printSchema()
print('Схема данных в стриме из топика kafka:')
stream_from_kafka.printSchema()

# запускаем поток с выводом в консоль
stream_from_kafka = console_stream(stream_from_kafka, processingTime='0 seconds', mode='append')

# ждём
time.sleep(20)
# проверяем содержимое консоли
console_show(spark)

# останавливаем потоки
stop_all_streams(spark)
# удаляем файловую консоль
console_clear(spark)

Схема данных в стриме из файла:
root
 |-- column_1: string (nullable = true)
 |-- joined_value: integer (nullable = true)
 |-- receive_time_file: timestamp (nullable = false)
 |-- window_time: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)

Схема данных в стриме из топика kafka:
root
 |-- column_1: string (nullable = true)
 |-- column_2: integer (nullable = true)
 |-- joined_value: integer (nullable = true)
 |-- receive_time: timestamp (nullable = false)
 |-- window_time: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)

+--------+--------+------------+------------------------+--------------------------------------------------------------------+
|column_1|column_2|joined_value|receive_time            |window_time                                                         |
+--------+--------+------------+------------------------+------------------------------