In [None]:
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-new')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.postgresql:postgresql:42.5.4,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://duycao:123@mongo:27017/streaming_project.orders?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://duycao:123@mongo:27017/streaming_project.orders?authSource=admin")
         .getOrCreate())
sc = spark.sparkContext


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, TimestampType
from pyspark.sql.functions import col,from_json, json_tuple, explode, to_json, struct, array, lit, collect_list

In [None]:
# read from customer-product from postgres
postgres_reader = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://172.25.32.171:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "postgres") \
    .option("password", "example")\

pg_customer_df = postgres_reader.option("dbtable", "public.customer") \
    .load()

pg_product_df = postgres_reader.option("dbtable", "public.product") \
    .load()

In [None]:
# Read the message from the kafka stream
kafka_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "api-ingestion") \
  .load()

# convert the binary values to string
kafka_df2 = kafka_df.selectExpr("CAST(value AS STRING)")

In [None]:
# apply schema on kafka df
data_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("order_list", ArrayType(StructType([
        StructField("product_id", IntegerType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("product_category", StringType(), True),
        StructField("price", DoubleType(), True)
    ])), True),
    StructField("order_timestamp", TimestampType(), True)
])


kafka_df3 = kafka_df2.withColumn("json-data", from_json("value", data_schema)).select("json-data.*")

In [None]:
renamed_pg_customer_df = pg_customer_df.withColumnRenamed("customer_id", "customer_id2")

df_joined_cust = kafka_df3.join(renamed_pg_customer_df, 
              renamed_pg_customer_df['customer_id2'] == kafka_df3['customer_id'],
              how = "inner")

df_explode = df_joined_cust.select("*", explode("order_list").alias("exploded_order_list"))

joined_df = df_explode.join(pg_product_df, df_explode["exploded_order_list.product_id"] == pg_product_df["product_id"], "left")

# Add the 'name' field to the 'order_list' struct
joined_df = joined_df.withColumn(
    "order_list_new",
    array(
        struct(
            col("exploded_order_list.product_id").alias("product_id"),
            col("exploded_order_list.quantity").alias("quantity"),
            col("price").alias("price"),
            col("product_category").alias("product_category")
        )
    )
)

# # Define the output mode and format
# output_query = result_df.writeStream \
#                         .format("console") \
#                         .outputMode("complete") \
#                         .start()


In [None]:
def foreach_batch_function(df, epoch_id):
    # Group by customer_id and order_timestamp and collect_list to restore the original order_list format
    result_df = df.groupBy("customer_id", "customer_name", "order_timestamp") \
                     .agg(collect_list("order_list_new").alias("order_list"))
    # Write the result to MongoDB using the "complete" mode
    result_df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
joined_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

In [None]:
# for query in spark.streams.active:
#     query.stop()

In [None]:
# spark.streams.active