# Kafka Integration with PySpark

Python equivalents of the Scala Kafka examples. This notebook demonstrates how to integrate Apache Kafka with PySpark for real-time data processing.

## Setup and Dependencies

First, ensure you have the required dependencies:

In [None]:
# Install required packages (run this in terminal if needed)
# pip install pyspark kafka-python

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
import time

print("Dependencies imported successfully")

## Initialize Spark Session

Create a Spark session configured for Kafka integration:

In [None]:
# Create Spark session with Kafka support
spark = SparkSession.builder \n    .appName("KafkaIntegration") \n    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \n    .getOrCreate()

print(f"Spark version: {spark.version}")
print("Spark session created with Kafka support")

## Kafka Producer - Writing to Kafka

Equivalent to HelloProducer.scala - sending messages to Kafka topic:

In [None]:
# Sample data to send to Kafka
sample_data = [
    {"id": 1, "name": "Alice", "age": 25, "city": "New York"},
    {"id": 2, "name": "Bob", "age": 30, "city": "San Francisco"},
    {"id": 3, "name": "Charlie", "age": 35, "city": "Chicago"},
    {"id": 4, "name": "Diana", "age": 28, "city": "Boston"}
]

# Create DataFrame from sample data
df = spark.createDataFrame(sample_data)
df.show()

# Note: In production, you would write to Kafka like this:
# df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \n#    .write \n#    .format("kafka") \n#    .option("kafka.bootstrap.servers", "localhost:9092") \n#    .option("topic", "user-data") \n#    .save()

print("DataFrame ready for Kafka publishing (uncomment write code when Kafka is running)")

## Kafka Consumer - Reading from Kafka

Equivalent to HelloConsumer.scala - consuming messages from Kafka topic:

In [None]:
# Read from Kafka (uncomment when Kafka is running)
# kafka_df = spark \n#     .readStream \n#     .format("kafka") \n#     .option("kafka.bootstrap.servers", "localhost:9092") \n#     .option("subscribe", "user-data") \n#     .load()

# # Parse JSON value column
# parsed_df = kafka_df \n#     .select(from_json(col("value").cast("string"), schema).alias("data")) \n#     .select("data.*")

# # Display streaming data
# query = parsed_df \n#     .writeStream \n#     .outputMode("append") \n#     .format("console") \n#     .start()

# query.awaitTermination()

print("Kafka consumer code ready (uncomment when Kafka is running)")
print("This demonstrates the PySpark Structured Streaming approach to Kafka consumption")

## Message Processing and Transformations

Process and transform the Kafka streaming data:

In [None]:
# Define schema for our JSON data
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# Simulate processing pipeline (would run on streaming data)
processed_df = df \n    .withColumn("age_group", 
                when(col("age") < 30, "Young")
                .when(col("age") < 40, "Middle-aged")
                .otherwise("Senior")) \n    .groupBy("city", "age_group") \n    .count() \n    .orderBy("city", "age_group")

processed_df.show()

print("Data processing pipeline ready for streaming data")

## Error Handling and Monitoring

Production-ready error handling patterns:

In [None]:
# Error handling in streaming queries
def process_batch(df, epoch_id):
    try:
        # Your processing logic here
        result = df.groupBy("city").count()
        result.write.mode("append").parquet(f"/output/epoch_{epoch_id}")
        print(f"Successfully processed batch {epoch_id}")
    except Exception as e:
        print(f"Error processing batch {epoch_id}: {str(e)}")
        # Could write to dead letter queue or alert monitoring

# Streaming query with error handling (uncomment when running)
# query = df \n#     .writeStream \n#     .foreachBatch(process_batch) \n#     .outputMode("update") \n#     .option("checkpointLocation", "/tmp/checkpoint") \n#     .start()

# query.awaitTermination()

print("Error handling patterns defined for production use")

## Key Differences: Scala vs Python

### Scala Examples (connectivity/):
- **HelloProducer.scala**: Direct Kafka client usage
- **HelloConsumer.scala**: Low-level consumer implementation
- **Focus**: Infrastructure-level integration

### Python Notebooks (notebooks/):
- **Structured Streaming**: High-level DataFrame API
- **SQL Integration**: Declarative transformations
- **Focus**: Data processing and analytics

### When to Use Each:
- **Scala**: Custom producers/consumers, performance-critical components
- **Python**: Data analysis, ML pipelines, rapid prototyping