In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, from_json, window, count,trunc,to_timestamp,date_trunc
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer
import pandas as pd
import os
import json
# from dotenv import load_dotenv

In [5]:
spark = (
    SparkSession.builder
    .appName("test-lmwn")
    # s3
    .config("spark.hadoop.fs.s3a.endpoint", "http://s3:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "miniopassword")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    # iceberg
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg.type", "hadoop")
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://lmwn/iceberg")

    # kafka
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
    )

    .getOrCreate()
)


In [6]:
jdbcUrl = "jdbc:postgresql://test-postgresql-1:5432/postgres"
connectionProperties = {
  "user": "postgres",
  "password": "",
  "driver": 'org.postgresql.Driver' # Specify the driver class
}
# df = spark.read.jdbc(jdbcUrl, "public.products", properties=connectionProperties)
# products.write.mode("overwrite") .jdbc(jdbcUrl, "public.products", properties=connectionProperties)

In [8]:
order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("status", StringType(), True),
])
print(order_schema)

StructType([StructField('order_id', StringType(), True), StructField('order_date', StringType(), True), StructField('user_id', StringType(), True), StructField('product_id', StringType(), True), StructField('quantity', IntegerType(), True), StructField('status', StringType(), True)])


In [5]:
orders = spark.read.option('header','true').csv('s3a://lmwn/data/orders.csv').alias('o')
products = spark.read.option('header','true').csv('s3a://lmwn/data/products.csv').alias('p')
orders_pg = spark.read.jdbc(jdbcUrl, "public.orders", properties=connectionProperties)
products_pg = spark.read.jdbc(jdbcUrl, "public.products", properties=connectionProperties)
orders_iceberg = spark.table('iceberg.orders')
products_iceberg = spark.table('iceberg.products')
orders_stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("subscribe", "orders").option("startingOffsets", "earliest").load()
order_product = orders.join(products,col('o.product_id')==col('p.product_id'),'left').select( col('order_id'),
                                                                                                  col('order_date'),
                                                                                                  col('user_id'),
                                                                                                  col('o.product_id'),
                                                                                                  col('quantity'),
                                                                                                  col('status'),
                                                                                                  col('product_name'),
                                                                                                  col('price'),
                                                                                                  col('category'))

In [7]:
order_stream = (
    orders_stream.select("timestamp", "value")
      .withColumn("value", from_json(col("value").cast("string"), order_schema)).
                select(col('timestamp'),
                    date_trunc("minute", col("timestamp")).alias("window_start"),
                    col("value.product_id").alias("product_id"),
                    col("value.order_id").alias("order_id"))
)

In [8]:
final_df =  order_stream.withWatermark('timestamp', "1 minutes")\
                .groupBy(col("product_id"),window(col('timestamp'),"1 minutes"))\
                    .agg(count(col("order_id")).alias("order_count")).select(
                        col("product_id"),             
                        col("window.start").alias("window_start"),
                        col("window.end").alias("window_end"),
                        col("order_count")
                    )

In [None]:
order_product.write.mode("overwrite") .jdbc(jdbcUrl, "public.order_product", properties=connectionProperties)

In [None]:
order_product.writeTo("iceberg.order_product").append() # .create() #.overwritePartitions()

In [11]:
order_stream.writeStream.outputMode("append")\
    .format("iceberg")\
    .option("checkpointLocation", "s3a://lmwn/checkpoints/orders")\
    .toTable("iceberg.stream.orders_stream")


<pyspark.sql.streaming.query.StreamingQuery at 0x7fdc82fed0d0>

In [12]:
final_df.writeStream.outputMode("append")\
    .format("iceberg")\
    .option("checkpointLocation", "s3a://lmwn/checkpoints/orders")\
    .toTable("iceberg.stream.orders_log")

<pyspark.sql.streaming.query.StreamingQuery at 0x7fdc82fee010>

In [None]:
%sql