In [None]:
import os
os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1"


from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType


DB_URL = "jdbc:postgresql://postgres:5432/postgres"
DB_USER = "myuser"
DB_PASS = "myuserpass"
DB_TABLE = "kafka_data_05"
BAD_ROWS_PATH = "/tmp/bad_rows_05"
CHECKPOINT_PATH = "/tmp/checkpoints/kafka-to-pgsql_05"


# SparkSession z obsługą Kafka + JSON
spark = (
    SparkSession.builder
    .appName("StreamStreamJoin")
    .master("local[*]")
    .getOrCreate()
)

# Schematy danych
orders_schema = (
    StructType()
    .add("order_id", IntegerType())
    .add("user_id", IntegerType())
    .add("amount", IntegerType())
    .add("timestamp", TimestampType())
)

users_schema = (
    StructType()
    .add("user_id", IntegerType())
    .add("user_name", StringType())
    .add("timestamp", TimestampType())
)

# Strumień zamówień
orders = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "spark-lab5-topic-orders")
    .option("startingOffsets", "latest")
    .load()
)

orders_df = (
    orders.selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), orders_schema).alias("order"))
    .select("order.*")
    .withWatermark("timestamp", "45 seconds")
    .alias("orders_df")
)

# Strumień użytkowników
users = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka_streaming_lab:9092")
    .option("subscribe", "spark-lab5-topic-users")
    .option("startingOffsets", "latest")
    .load()
)

users_df = (
    users.selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), users_schema).alias("user"))
    .select("user.*")
    .withWatermark("timestamp", "60 seconds")
    .alias("users_df")
)

# Strumieniowe łączenie po user_id
joined = orders_df.join(
    users_df,
    expr("""
        orders_df.user_id = users_df.user_id AND
        orders_df.timestamp BETWEEN users_df.timestamp - interval 15 seconds AND users_df.timestamp + interval 15 seconds
    """)
)

joined = joined.select(
    col("order_id"),
    col("orders_df.user_id").alias("user_id"),
    col("amount"),
    col("orders_df.timestamp").alias("order_timestamp"),
    col("user_name"),
    col("users_df.timestamp").alias("user_timestamp")
)

# Zapisywanie do PostgreSQL
def write_to_postgres(batch_df, batch_id):
    (
        batch_df.write
        .format("jdbc")
        .option("url", DB_URL)
        .option("dbtable", DB_TABLE)
        .option("user", DB_USER)
        .option("password", DB_PASS)
        .option("driver", "org.postgresql.Driver")
        .mode("append")
        .save()
    )

# Zapis jako foreachBatch
query = (
    joined.writeStream
    .foreachBatch(write_to_postgres)
    .outputMode("append")
    .option("checkpointLocation", CHECKPOINT_PATH)
    .start()
)

# Zapis do Kafka
# (
#     kafka_ready.write
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:9092")
#     .option("topic", "test-topic")
#     .save()
# )

query.awaitTermination()
