In [1]:
from pyspark.sql import SparkSession

# Set the location of the Delta Lake and Kafka packages
delta_package = "io.delta:delta-spark_2.12:3.0.0"  # Replace with the correct Delta version
kafka_package = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1"  # Replace with the correct Spark version
xml_package = "com.databricks:spark-xml_2.12:0.14.0"

# Initialize Spark Session with Delta Lake and Kafka support
spark = SparkSession.builder \
    .appName("KafkaToHDFSConsumer") \
    .config("spark.jars.packages", f"{delta_package},{kafka_package},{xml_package}") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Kafka Parameters
kafka_server = "spark-test1:9092"
topic_name = "test-topic"

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", topic_name) \
    .load()

:: loading settings :: url = jar:file:/home/spark/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-97671565-a396-4ca3-a51a-94be340280fd;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in c

In [2]:
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os, traceback

schema = StructType([
    StructField(
        'Transaction', 
        ArrayType(
            StructType([
                StructField('Amount', LongType(), True),
                StructField('CustomerId', LongType(), True),
                StructField('DateTime', TimestampType(), True),
                StructField('Location', StringType(), True),
                StructField('Result', StringType(), True)
            ]),
            True
        ),
        True
    )
])
hdfs_path = "hdfs://spark-test1:9000"
raw = os.path.join(hdfs_path, 'raw', 'transactions')
checkpoint = os.path.join(hdfs_path, 'checkpoint', 'transactions')
dlq = os.path.join(hdfs_path, 'dlq', 'logins')

# Function to process each batch
def process_batch(batch_df, batch_id):
    if not batch_df.rdd.isEmpty():
        batch_df.show()  # or any other processing you want to do
        try:
            # Process non-empty batch
            #xml_df = batch_df.selectExpr("CAST(value AS STRING) as xml", "offset as kafka_offset", "key as kafka_key")
            # Infer the schema from the XML data
            # schema = ext_schema_of_xml_df(xml_df)
            # print(schema)
            parsed_df = batch_df.withColumn(
                "parsed",
                ext_from_xml(
                    xml_column = col("value"),
                    schema=schema,
                    options={"mode": "FAILFAST"}
                )
            )
            # Flatten the DataFrame
            flattened_df = parsed_df.select(
                explode(col("parsed.Transaction")).alias("Transaction")
            )
            flattened_df.printSchema()
            flattened_df.show()
            flattened_df = flattened_df.select(
                col("Transaction.Amount").alias("Amount"),
                col("Transaction.CustomerId").alias("CustomerId"),
                col("Transaction.DateTime").alias("DateTime"),
                upper(trim(col("Transaction.Location"))).alias("Location"),
                upper(trim(col("Transaction.Result"))).alias("Result"),
                current_date().alias("_etl_insert_date"),
                date_format(current_timestamp(), "HH").alias("_etl_insert_hour"),
                current_timestamp().alias("_etl_insert_timestamp")
            )

            # Write to HDFS as Parquet partitioned by ingestion date
            flattened_df.write.mode("append") \
                .partitionBy("_etl_insert_date", "_etl_insert_hour") \
                .parquet(raw)
        except Exception as e:
            error_text = traceback.format_exc()
            print(f"Exception occurred during batch processing:\n{error_text}")
            error_df = batch_df.selectExpr("CAST(value AS STRING) as xml_data") \
                            .withColumn("_etl_error_text", lit(error_text)) \
                            .withColumn("_etl_insert_date", current_date()) \
                            .withColumn("_etl_insert_hour", date_format(current_timestamp(), "HH")) \
                            .withColumn("_etl_insert_timestamp", current_timestamp())
            error_df.write.mode("append").partitionBy("_etl_insert_date", "_etl_insert_hour").format("parquet").save(dlq)
    else:
        print("Empty batch")

df.printSchema()
df_string = df.selectExpr(
    "CAST(key AS STRING)",
    "CAST(value AS STRING)",
    "CAST(topic AS STRING)",
    "CAST(partition AS STRING)",
    "CAST(offset AS STRING)",
    "CAST(timestamp AS STRING)",
    "CAST(timestampType AS STRING)" 
)

# Applying the function to each batch
query = df_string.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint) \
    .start() \
    .awaitTermination()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



23/12/09 01:04:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/09 01:04:26 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
23/12/09 01:04:26 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
23/12/09 01:04:26 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
23/12/09 01:04:26 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
23/12/09 01:04:26 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
23/12/09 01:04:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                             

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|NULL|<?xml version="1....|test-topic|        0|   810|2023-12-09 01:04:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+

root
 |-- Transaction: struct (nullable = true)
 |    |-- Amount: long (nullable = true)
 |    |-- CustomerId: long (nullable = true)
 |    |-- DateTime: timestamp (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- Result: string (nullable = true)



                                                                                

+--------------------+
|         Transaction|
+--------------------+
|{91, 97, 2023-12-...|
|{22, 50, 2023-12-...|
|{34, 13, 2023-12-...|
|{10, 87, 2023-12-...|
|{23, 45, 2023-12-...|
+--------------------+



23/12/09 01:04:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|NULL|<?xml version="1....|test-topic|        0|   811|2023-12-09 01:04:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+

root
 |-- Transaction: struct (nullable = true)
 |    |-- Amount: long (nullable = true)
 |    |-- CustomerId: long (nullable = true)
 |    |-- DateTime: timestamp (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- Result: string (nullable = true)



                                                                                

+--------------------+
|         Transaction|
+--------------------+
|{97, 58, 2023-12-...|
|{78, 53, 2023-12-...|
|{34, 8, 2023-12-0...|
|{95, 80, 2023-12-...|
+--------------------+



23/12/09 01:04:57 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|NULL|<?xml version="1....|test-topic|        0|   812|2023-12-09 01:04:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+

root
 |-- Transaction: struct (nullable = true)
 |    |-- Amount: long (nullable = true)
 |    |-- CustomerId: long (nullable = true)
 |    |-- DateTime: timestamp (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- Result: string (nullable = true)

+--------------------+
|         Transaction|
+--------------------+
|{24, 82, 2023-12-...|
|{68, 86, 2023-12-...|
|{6, 63, 2023-12-0...|
|{32, 64, 2023-12-...|
|{1, 63, 2023-12-0...|
+--------------------+



ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/home/spark/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/spark/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 