In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, current_timestamp
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer


kafka_config = {
    "kafka.bootstrap.servers":"localhost:9092",
    # "kafka.security.protocol":"SASL_SSL",
    # "kafka.sasl.mechanism":"PLAIN",
    # "kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username='***' password='**';",
    "startingOffsets":"latest"
}

orders_data_topic = "orders"
payments_data_topic = "payments"
order_schema_name = f'{orders_data_topic}-value'
payment_schema_name = f'{payments_data_topic}-value'

mongo_config = {
    "spark.mongodb.connection.uri":"mongodb://admin:password@127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.3.2",
    "spark.mongodb.database":"ecomm_mart",
    "spark.mongodb.collection":"validated_orders"    
}

schema_reg_addr = 'http://localhost:8081'
# Create a Schema Registry client
schema_registry_client = SchemaRegistryClient({
  'url': 'http://localhost:8081',
#   'basic.auth.user.info': '{}:{}'.format('klm', 'abc123')
})

order_schema = schema_registry_client.get_latest_version(order_schema_name).schema.schema_str
payment_schema = schema_registry_client.get_latest_version(payment_schema_name).schema.schema_str

print(payment_schema)
payment_schema = '''
{
  "type": "record",
  "name": "Payment",
  "namespace": "io.github.thepcsahu",
  "fields": [
    {
      "name": "payment_id",
      "type": "string"
    },
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "payment_date",
      "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}],
      "default": null
    },
    {
      "name": "created_at",
      "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}],
      "default": null
    },
    {
      "name": "amount",
      "type": ["null", "int"],
      "default": null
    }
  ]
}
'''
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ECommerceOrdersPaymentsStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.5.0,org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .config("spark.sql.shuffle.partitions", "5") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.streaming.kafka.maxRatePerPartition", "10") \
    .config("spark.default.parallelism", "5") \
    .getOrCreate()

# Define the order and payment schemas
# order_schema = StructType([
#     StructField("order_id", StringType(), True),
#     StructField("order_date", TimestampType(), True),
#     StructField("created_at", TimestampType(), True),
#     StructField("customer_id", StringType(), True),
#     StructField("amount", IntegerType(), True)
# ])

# payment_schema = StructType([
#     StructField("payment_id", StringType(), True),
#     StructField("order_id", StringType(), True),
#     StructField("payment_date", TimestampType(), True),
#     StructField("created_at", TimestampType(), True),
#     StructField("amount", IntegerType(), True)
# ])


# # Read orders from Kafka
# orders_df = spark \
#     .readStream \
#     .format("kafka") \
#     .options(**kafka_config) \
#     .option("subscribe", orders_data_topic) \
#     .load() \
#     .selectExpr("CAST(value AS BINARY) as value") \
#     .select(from_avro(col("value"), order_schema).alias("data")) \
#     .select("data.*") \
#     .withWatermark("order_date", "10 minutes")\
#     .repartition(5)
# print("Orders stream read")


# Read payments from Kafka
payments_df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_config) \
    .option("subscribe", payments_data_topic) \
    .load() 
    # .selectExpr("CAST(value AS binary) as value") 
payments_df = payments_df \
        .select(from_avro(payments_df.value,payment_schema).alias("data"))
#     .select("data.*") \
#     .withWatermark("payment_date", "10 minutes")\
#     .repartition(5)
print("Payments stream read")

# # Join orders and payments on order_id with 30 minutes window
# joined_df = orders_df.alias("orders").join(
#     payments_df.alias("payments"),
#     expr("""
#         orders.order_id = payments.order_id AND
#         payments.payment_date BETWEEN orders.order_date AND
#         orders.order_date + interval 1 minutes AND payments.amount = orders.amount
#     """),
#     "inner"
# ).select(
#     col("orders.order_id"),
#     col("orders.order_date"),
#     col("orders.created_at").alias("order_created_at"),
#     col("orders.customer_id"),
#     col("orders.amount").alias("order_amount"),
#     col("payments.payment_id"),
#     col("payments.payment_date"),
#     col("payments.created_at").alias("payment_created_at"),
#     col("payments.amount").alias("payment_amount")
# ).repartition(5)

# #Write completed orders to MongoDB
# query = joined_df.writeStream \
#     .outputMode("append") \
#     .format("mongodb") \
#     .option("checkpointLocation", "./temp_data/mongo_db_checkpoint11") \
#     .options(**mongo_config) \
#     .option("truncate", False) \
#     .start()

# Write completed orders to MongoDB
query = payments_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "./temp_data/mongo_db_checkpoint16") \
    .options(**mongo_config) \
    .option("truncate", False) \
    .start()

# payments_df.writeStream \
#    .outputMode("append") \
#    .format("json") \
#    .option("path", "file_sink") \
#    .option("header", "true") \
#    .option("checkpointLocation", "./temp_data/mongo_db_checkpoint15") \
#    .start() \
#    .awaitTermination()
print("Write successfull")

query.awaitTermination()

{"type":"record","name":"Payment","namespace":"io.github.thepcsahu","fields":[{"name":"payment_id","type":"string"},{"name":"order_id","type":"string"},{"name":"payment_date","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"created_at","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"amount","type":["null","int"],"default":null}]}
Payments stream read
Write successfull


StreamingQueryException: [STREAM_FAILED] Query [id = 56571a88-09b5-4034-9d72-fe7be8f21735, runId = 7834e8da-a49e-4b68-966a-b0bacc072c0a] terminated with exception: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (Poonam executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:114)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -2 out of bounds for length 2
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
	at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:100)
	... 20 more

Driver stacktrace: