In [10]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import from_json, get_json_object, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType

In [None]:
# import pyspark
# print(pyspark.__version__)
# import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [2]:
KAFKA_HOST = "broker:9092"  # "localhost:9092"
KAFKA_TOPIC = "cdc.public.transactions"

# Create a SparkSession

# spark = SparkSession.builder \
#     .appName("KafkaIntegration") \
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
#     .getOrCreate()

spark = SparkSession.builder.appName("KafkaStructuredStreaming").getOrCreate()

#### READ STREAM

In [4]:
# Read the Kafka stream from the beginning
kafka_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_HOST)
    .option("subscribe", KAFKA_TOPIC)
    .option("startingOffsets", "earliest")
    .load()
)

#### WRITE RAW STREAM 1 - check inside pyspark-notebook container

In [22]:
# Temporary query to display raw Kafka messages
query = kafka_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination(timeout=20)

False

In [38]:
query.stop()  # Stop the query to proceed with the rest of your code

#### WRITE RAW STREAM 2 - review raw stream inside the notebook

In [10]:
queryName = "streaming_table"

query = (
    kafka_df.writeStream.outputMode("append")
    .format("memory")
    .queryName(queryName)
    .start()
)

In [None]:
import time
from IPython.display import display, clear_output

try:
    while True:
        # Clear the output
        clear_output(wait=True)
        # Run a SQL query and display the result
        display(spark.sql(f"SELECT * FROM {queryName} LIMIT 1").show(5, False))
        # Wait for a few seconds before the next update
        time.sleep(5)
except KeyboardInterrupt:
    print("Streaming stopped.")

In [26]:
query.stop()

#### SCHEMA & PROCESS STREAM

In [29]:
# Define the schema based on the Kafka message structure
after_schema = StructType(
    [
        StructField("transaction_id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField(
            "timestamp", LongType(), True
        ),  # Assuming timestamp is in microseconds
        StructField(
            "amount", StringType(), True
        ),  # You may want to parse it as DecimalType if it represents monetary value
        StructField("currency", StringType(), True),
        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("merchant_name", StringType(), True),
        StructField("payment_method", StringType(), True),
        StructField("ip_address", StringType(), True),
        StructField("voucher_code", StringType(), True),
        StructField("affiliateid", StringType(), True),
    ]
)

In [None]:
# PARSE ONLY COLUMNS IN AFTER SECTION OF JSON
json_df = kafka_df.select(col("value").cast("string").alias("json_string"))

after_json_df = json_df.select(
    get_json_object(col("json_string"), "$.payload.after").alias("after_json")
)

parsed_after_df = after_json_df.select(
    from_json(col("after_json"), after_schema).alias("after")
)

flattened_df = parsed_after_df.select("after.*")

In [None]:
# # PARSE COLUMNS IN AFTER SECTION OF JSON & TIMESTAMP & OFFSET FROM THE ORIGINAL KAFKA DATASET
# after_json_df = kafka_df.select(
#     kafka_df["timestamp"],
#     kafka_df["offset"],
#     get_json_object(col("value").cast("string"), "$.payload.after").alias("after_json")
# )

# parsed_after_df = after_json_df.select(
#     col("timestamp"),
#     col("offset"),
#     from_json(col("after_json"), after_schema).alias("after")
# )

# flattened_df = parsed_after_df.select(
#     "timestamp",
#     "offset",
#     "after.*"  # Flatten all fields from the 'after' struct
# )

#### WRITE PROCESSED STREAM 1 - review stream inside the pyspark-notebook container

In [None]:
query = flattened_df.writeStream.outputMode("append").format("console").start()

query.awaitTermination()

In [55]:
query.stop()

#### WRITE PROCESSED STREAM 2 - review stream inside notebook

In [35]:
import pandas as pd
import time
from IPython.display import display, clear_output

# Start the streaming query, writing the output to a memory table
query = (
    flattened_df.writeStream.outputMode("append")
    .format("memory")
    .queryName("transactions")
    .start()
)

# Use a loop to display the DataFrame content in real-time
try:
    while True:
        # Clear previous output
        clear_output(wait=True)
        # Create a Pandas DataFrame from the Spark DataFrame
        result_df = spark.sql("SELECT * FROM transactions").toPandas()
        # Display the Pandas DataFrame
        display(result_df)
        # Wait for a few seconds before the next update
        time.sleep(1)
except KeyboardInterrupt:
    # Stop the streaming query if a keyboard interrupt is detected
    query.stop()

Unnamed: 0,transaction_id,user_id,timestamp,amount,currency,city,country,merchant_name,payment_method,ip_address,voucher_code,affiliateid
0,f6d13fcd-5d1d-48d3-a551-897bba454ff5,derrickanderson,1709968558000000,903.97,GBP,Kristintown,Norway,"Poole, Bryant and Fowler",online_transfer,123.234.156.119,,26f5e234-55f3-448b-9aaf-f2fbe2e8fd28
1,5cfeb3f0-9fe5-46c1-9c1d-c0c920fde367,lmiller,1709968559000000,246.53,GBP,Thomastown,San Marino,Carney-Kirk,credit_card,44.66.8.190,,7ebc2b6a-1abb-4373-88ea-90d235a308e7
2,06ae17f4-332e-4d90-942a-50407f7a5a73,edwardfox,1709968560000000,600.43,GBP,Deniseport,Congo,Garcia-Carpenter,credit_card,175.3.242.31,,04e2576c-8006-4ad2-a490-2fdc509f9735
3,59202394-cf39-4e02-90b4-ee672ea7fbc6,jensenmichelle,1709968561000000,420.75,USD,Franklinland,Belize,Marshall-Taylor,debit_card,208.18.221.60,,4844f6c3-4f1b-4731-b271-8db49e181bf4
4,d94c156f-fb58-460c-8169-d8aa1f7466c0,leslieporter,1709968562000000,33.48,GBP,Higginsland,Palestinian Territory,Snyder and Sons,debit_card,201.152.32.44,,a5027d36-0e92-4422-b11e-d0b8afb10771
...,...,...,...,...,...,...,...,...,...,...,...,...
155,3c055c85-7b7d-47c7-b218-570d20e69b9e,karla66,1709969783000000,712.01,GBP,North Joymouth,Mali,"Clark, Smith and Thomas",debit_card,135.238.114.204,DISCOUNT10,a2799edb-0ec2-479f-a465-afab658ecfb5
156,8a9047f2-54af-4bea-ac6b-850340563ded,charleshuang,1709969784000000,775.66,GBP,Davidview,Japan,Berger-Parrish,debit_card,11.145.149.247,DISCOUNT10,3d22efa1-d2df-42fb-bcc5-62e8ebee367a
157,a017a61d-b895-43e5-b676-140e62698452,klane,1709969785000000,284.58,USD,North Joshuahaven,Paraguay,Smith LLC,debit_card,5.162.97.16,,baea210f-f6f0-4622-b290-ab96035f625b
158,3002cdf2-aca9-4c20-a3e9-f5a95fc92e4b,melaniewilliams,1709969786000000,977.94,USD,Hollandfort,Eritrea,Miller and Sons,credit_card,125.12.200.189,,8953c4c6-b72c-48a0-ae13-f6cef042a5fc


In [37]:
query.stop()