In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType

# 1 Инициализация Spark
# Здесь важно выбрать правильную версию "org.apache.hadoop:hadoop-aws:3.4.1" исходя из версии которую вернет "version of pyspark and minio.ipynb"
spark = SparkSession \
    .builder \
    .appName("KafkaDebeziumStream") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .config(
        "spark.jars.packages",
        ",".join([
            "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1",
            "org.apache.hadoop:hadoop-aws:3.4.1",
            "com.amazonaws:aws-java-sdk-bundle:1.12.262"
        ])
    ) \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# 2 Чтение потока из Kafka
lines = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29093") \
    .option("subscribe", "data.cdc.dbname_order_events") \
    .option("startingOffsets", "earliest") \
    .load()

# 3 Преобразуем key и value из байт в строки
lines = lines.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")

# 4 Определяем схемы JSON
key_schema = StructType([
    StructField("id", LongType())
])

data_table_schema = StructType([
    StructField("id", LongType()),
    StructField("order_id", LongType()),
    StructField("status", StringType()),
    StructField("ts", LongType())
])

main_schema = StructType([
    StructField("before", data_table_schema),
    StructField("after", data_table_schema),
    StructField("op", StringType()),
    StructField("ts_ms", LongType()),
    StructField("ts_ns", LongType())
])

# 5 Парсим key и value
parsed = lines.select(
    from_json(col("key"), key_schema).alias("key_data"),
    from_json(col("value"), main_schema).alias("value_data")
)

# 6 Извлекаем нужные поля
result = parsed.select(
    col("key_data.id").alias("key_id"),
    col("value_data.op").alias("operation"),
    col("value_data.after.id").alias("id"),
    col("value_data.after.order_id").alias("order_id"),
    col("value_data.after.status").alias("status"),
    col("value_data.after.ts").alias("ts_data"),
    col("value_data.ts_ns").alias("ts_event_ns")
)

# 7 Фильтруем только операции insert/update
filtered = result.filter(col("operation").isin("c", "u", "d"))

# 8 Запись в MinIO как в S3 (в формате Parquet)
query = filtered.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "s3a://data/orders_stream/") \
    .option("checkpointLocation", "s3a://data/checkpoints/orders_stream/") \
    .start()

query.awaitTermination()