In [44]:
# Create the Spark Session
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import from_json, col, expr, when , lit , window, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, TimestampType, ArrayType

# Create the Spark Session
spark = (
    SparkSession.builder
    .appName("Transaction Streamin Job") 
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
    .config("spark.sql.shuffle.partitions", "4")
    .master("local[*]")
    .getOrCreate()
)

In [45]:
# Check on spark object
spark

In [46]:
# Define the schema for the JSON data
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("source", StringType(), True)
])

In [47]:
# Create the kafka_df to read from kafka
kafka_df = (
    spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9094") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()
)

In [48]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [49]:
# Deserialize and create the value_df to read from kafka    
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value")).select("value.*")


In [50]:
# value_df.show()

In [51]:
value_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- source: string (nullable = true)



In [52]:
# Validate the data
validated_df = value_df.withColumn("error_reason", 
                                   when(col("user_id").isNull() | col("amount").isNull() | col("timestamp").isNull(), "Missing mandatory fields")
                                   .when((col("amount") < 1) | (col("amount") > 10000000), "Amount out of range")
                                   .when(~col("source").isin("mobile", "web", "pos"), "Invalid source")
                                   .otherwise(None)
                                  ).withColumn("is_valid", col("error_reason").isNull())

In [53]:
# add watermark and drop duplicates
valid_df = validated_df.filter(col("is_valid") == True) \
                .withWatermark("timestamp", "3 minutes") \
                .dropDuplicates(["user_id", "timestamp"])


invalid_df = validated_df.filter(col("is_valid") == False)

In [54]:
# Apply tumbling window monitoring

window_agg_df = valid_df \
                .groupBy(window(col("timestamp"), "1 minutes")) \
                .count() \
                .select(current_timestamp().alias("timestamp"), col("window.start").alias("window_start"), col("window.end").alias("window_end"), col("count").alias("total_transactions"))

# window_agg_df.show()

In [55]:
console_output_df = window_agg_df.select(
                    current_timestamp().alias("timestamp"), 
                    col("total_transactions").alias("running_total")
                )

In [57]:
# Write valid data to kafka

query_console = console_output_df.writeStream \
                .outputMode("complete") \
                .format("console") \
                .option("truncate", "false") \
                .start()

valid_query = valid_df.selectExpr("to_json(struct(*)) as value") \
                        .writeStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "kafka:9094") \
                        .option("topic", "transactions_valid") \
                        .option("checkpointLocation", "checkpoints/valid") \
                        .start()

invalid_dlq = invalid_df.selectExpr("to_json(struct(*)) as value") \
                        .writeStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "kafka:9094") \
                        .option("topic", "transactions_dlq") \
                        .option("checkpointLocation", "checkpoints/dlq") \
                        .start()
                        
spark.streams.awaitAnyTermination()