In [2]:
from pyspark.sql import SparkSession

In [10]:
spark = SparkSession.builder \
    .appName("Kafka-Spark-Integration") \
    .master("local") \
    .getOrCreate()

In [11]:
spark

In [18]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "earliest") \
    .load()

In [13]:
df.schema

StructType([StructField('key', BinaryType(), True), StructField('value', BinaryType(), True), StructField('topic', StringType(), True), StructField('partition', IntegerType(), True), StructField('offset', LongType(), True), StructField('timestamp', TimestampType(), True), StructField('timestampType', IntegerType(), True)])

In [14]:
parsed_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [17]:
query = parsed_df \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

In [20]:
# After starting the query
print(f"Query is active: {query.isActive}")
print(f"Query name: {query.name}")
print(f"Query ID: {query.id}")

# You can also monitor the progress of the query
import time
for _ in range(5):  # Check progress 5 times
    print(query.status)
    time.sleep(2)  # Wait 2 seconds between checks

Query is active: True
Query name: None
Query ID: 55e9e7c7-4b8e-4271-9d0e-a1066f846437
{'message': 'Getting offsets from KafkaV2[Subscribe[user-events]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}
{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}


In [21]:
# Get the most recent progress metrics
recent_progress = query.recentProgress
if recent_progress:
    for progress in recent_progress:
        print(f"Input rate: {progress.get('inputRate', 'N/A')} records/sec")
        print(f"Processing rate: {progress.get('processedRowsPerSecond', 'N/A')} rows/sec")
        print(f"Batch duration: {progress.get('batchDuration', 'N/A')} ms")
else:
    print("No progress metrics available yet")

Input rate: N/A records/sec
Processing rate: 330.75933075933074 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.4104372355430184 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.3986013986013988 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.297016861219196 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.2658227848101264 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.3262599469496021 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.430615164520744 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.4104372355430184 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.451378809869376 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate: 1.4409221902017293 rows/sec
Batch duration: N/A ms
Input rate: N/A records/sec
Processing rate

In [22]:
# Start the streaming query writing to an in-memory table
query = parsed_df \
    .writeStream \
    .format("memory") \
    .queryName("kafka_data") \
    .outputMode("append") \
    .start()

# Give it a moment to process some data
time.sleep(5)

# Now you can query the in-memory table
spark.sql("SELECT * FROM kafka_data").show()

+----+--------------------+
| key|               value|
+----+--------------------+
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
|NULL|{"event_time": "2...|
+----+--------------------+
only showing top 20 rows



In [23]:
spark.sql("SELECT * FROM kafka_data").show(truncate=False)

+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NULL|{"event_time": "2025-03-20T00:04:11.782493", "event_type": "remove_from_cart", "user_id": "user_12", "product_id": "product_471", "category": "toys", "price": 64.86, "quantity": 1} |
|NULL|{"event_time": "2025-03-20T00:04:40.109885", "event_type": "purchase", "user_id": "user_44", "product_id": "product_115", "category": "home", "price": 474.35, "quantity": 1}        |
|NULL|{"event_time": "2025-03-20T00:04:15.111787", "eve