# Worked Examples & Exercises - Part 2

## Example 6: Create Another Kafka Topic

In [None]:
#Download Kafka
!wget https://downloads.apache.org/kafka/3.9.0/kafka_2.12-3.9.0.tgz
!tar -xzf kafka_2.12-3.9.0.tgz
!sudo mv kafka_2.12-3.9.0 /usr/local/kafka

In [None]:
#Set Environment Variables
!echo "export KAFKA_HOME=/usr/local/kafka" >> ~/.bashrc
!echo "export PATH=\$PATH:\$KAFKA_HOME/bin" >> ~/.bashrc

# Export for current session
import os
os.environ["KAFKA_HOME"] = "/usr/local/kafka"
os.environ["PATH"] = os.environ["PATH"] + ":" + os.environ["KAFKA_HOME"] + "/bin"

print("Environment variables set for this session.")

In [None]:
#Install Zookeeper
!sudo apt-get update
!sudo apt-get install -y zookeeper

In [None]:
#Set Zookeeper Environment Variables
!echo "export ZOOKEEPER_HOME=/usr/share/zookeeper" >> ~/.bashrc
!echo "export PATH=\$PATH:\$ZOOKEEPER_HOME/bin" >> ~/.bashrc

# Export for current session
import os
os.environ["ZOOKEEPER_HOME"] = "/usr/share/zookeeper"
os.environ["PATH"] = os.environ["PATH"] + ":" + os.environ["ZOOKEEPER_HOME"] + "/bin"

print("Zookeeper environment variables set for this session.")

In [None]:
#Start Kafka and Zookeeper
# Start Zookeeper first
!sudo /usr/share/zookeeper/bin/zkServer.sh start

# Add a short delay to ensure Zookeeper is fully started
!sleep 5

# Start Kafka using the full path
!/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

# Add a short delay to ensure Kafka is fully started
!sleep 5

print("Zookeeper and Kafka started.")

In [None]:
#Create WeatherTopic2
!/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 5 --topic weatherTopic-new

In [None]:
#Verify that WeatherTopic2 was created
!/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

**What the code does:**
- Demonstrates how to create a second topic with 5 partitions.

**Exercise 6**:
- Create a topic named labWeather with 3 partitions.
- **Question**: Which command do you use to confirm its creation?

In [None]:
# Create your labWeather topic here
!/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 5 --topic labWeather

## Example 7: Check Topic Details

In [None]:
!/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 \  --topic labWeather

**What the code does:**
- Shows replication factor, partition count, and leader info for weatherTopic.

**Exercise 7**:
- Describe your labWeather topic.
- **Question**: How many partitions and which broker is the leader?

In [None]:
# Describe your labWeather topic here
!!/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 localhost:2181 \
  --topic labWeather

## Example 8: Spark Structured Streaming from Kafka

In [None]:
# PySpark code
# For Scala, use the commented example below in a Spark shell or notebook with Scala kernel

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .getOrCreate()

kafkaDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "weatherTopic") \
    .load()

query = kafkaDF.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination()

**What the code does:**
- Reads streaming data from Kafka into Spark, printing messages to console in real-time.

**Exercise 8**:
- Stream data from your labWeather topic instead.
- **Question**: Does the console immediately show the messages you produce?

In [None]:
# Modify the code to stream from labWeather topic
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .getOrCreate()

kafkaDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "labWeather") \
    .load()

query = kafkaDF.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination()

## Example 9: Checkpointing in Spark

In [None]:
# PySpark code
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaSparkStreamingWithCheckpoint") \
    .getOrCreate()

kafkaDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "weatherTopic") \
    .load()

query = kafkaDF.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .option("checkpointLocation", "hdfs:///checkpoints/kafka-weather") \
    .format("console") \
    .start()

query.awaitTermination()

**What the code does:**
- Storing offsets in a checkpoint location ensures fault tolerance.

**Exercise 9**:
- Create a checkpoint location hdfs:///checkpoints/labWeather for your streaming job.
- **Question**: In case of failure, can the stream recover from the last known offsets?

In [None]:
# Modify the code to use the labWeather checkpoint
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaSparkStreamingWithCheckpoint") \
    .getOrCreate()

kafkaDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "labWeather") \
    .load()

query = kafkaDF.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .option("checkpointLocation", "hdfs:///checkpoints/labWeather") \
    .format("console") \
    .start()

query.awaitTermination()