In [1]:
""" 
Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too. 
- https://kontext.tech/article/475/spark-structured-streaming-read-from-and-write-into-kafka-topics
- https://urlit.me/blog/pyspark-structured-streaming-read-from-files/

Spark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications. 

An easier way is to use --packages option when deploy application:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  spark-kafka.py

Spark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories. 

KAFKA and MONGODB
https://kontext.tech/article/841/spark-read-and-write-data-with-mongodb

KAFKA and SQL
https://kontext.tech/article/290/connect-to-sql-server-in-spark-pyspark
https://kontext.tech/article/1061/pyspark-read-data-from-mariadb-database


DELTA LAKE TABLE
https://kontext.tech/article/1175/introduction-to-delta-lake-with-pyspark
https://kontext.tech/code/1186/time-travel-with-delta-table-in-pyspark

A LOT OF ART
https://kontext.tech/tag/pyspark?p=6


SPARK STREAMING DOCS
- https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html


"""

' \nSpark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too. \n- https://kontext.tech/article/475/spark-structured-streaming-read-from-and-write-into-kafka-topics\n\nSpark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications. \n\nAn easier way is to use --packages option when deploy application:\n\nspark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  spark-kafka.py\n\nSpark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories. \n\nKAFKA and MONGODB\nhttps://kontext.tech/article/841/spark-read-and-write-data-with-mongo

In [None]:
# Use Kafka source for batch queries

from pyspark.sql import SparkSession

appName = "KafkaStream"
master = "local"

spark = SparkSession.builder \
    .master(master) \
    .appName(appName) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()

# kafka parameters
kafka_server = "localhost:9092"
kafka_topic ="gjson"

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", kafka_topic ) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

''' 
Data is posted in Kafka in the following dataframe format with some additional columns. 
The value column contains the actual data in Binary format.
'''

# Get stream Schema
df.printSchema()

# converts key and value columns to STRING data type from binary. 
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop('key') \
        .withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')

df.show(5)

streaming_df = df

# Lets read the data from value column, cast to string and expand the JSON
# Parse value from binay to string
json_df = streaming_df.selectExpr("cast(value as string) as value")

# Create some schema for the Value object cominig from kafka stream - value

# JSON Schema

from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

# example schema for temp sensor
json_schema = StructType([StructField('customerId', StringType(), True), \
StructField('data', StructType([StructField('devices', ArrayType(StructType([ \
StructField('deviceId', StringType(), True), \
StructField('measure', StringType(), True), \
StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), \
StructField('eventId', StringType(), True), \
StructField('eventOffset', LongType(), True), \
StructField('eventPublisher', StringType(), True), \
StructField('eventTime', StringType(), True)])

# Define the JSON structure for Poem   { 'type': poem_type, 'message': poem_message }

json_schema = StructType([
    StructField("type", StringType(), True),
    StructField("message", StringType(), True)
])

# Apply Schema to JSON value column and expand the value
from pyspark.sql.functions import from_json

json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*") 



In [None]:
# Subscribe to two topics

kafka_topic_1 ="gjson"
kafka_topic_2 ="gnone"

# Concatenate the two topics into a single string separated by commas. 
kafka_multi_topics = ",".join([kafka_topic_1, kafka_topic_2])

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe",  kafka_multi_topics ) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')

df.show(5)

# Cast the value column as String (assuming the messages are in String format)
df = df.selectExpr("CAST(value AS STRING)")

# Perform further transformations or analysis
# For example, you can split the value into words
words = df.selectExpr("value", "explode(split(value, ' ')) as word")

# Display the streaming results to the console
query = words \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
# The startingOffsets and endingOffsets options in PySpark Structured Streaming allow you to specify the starting and ending offsets when reading from a Kafka topic.

''' 

startingOffsets and endingOffsets options in PySpark Structured Streaming are used to define the starting and ending offsets when reading from a Kafka topic.

Topic kontext-kafka: reading from partition 2 for events starting with offeset 98; partion 0 and 1 for all events. 
Topic kontext-events: reading from partition 0 for all the events. 
infoIn the offset json, -2 represents an offset can be used to refer to earliest and -1 to lates

.option("startingOffsets", """{"kontext-kafka":{"2":98,"0":0,"1":0},"kontext-events":{"0":-2}}""") \
.option("endingOffsets", """{"kontext-kafka":{"2":-1,"0":-1,"1":-1},"kontext-events":{"0":-1}}""") \

'''

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaStructuredStreamingExample") \
    .getOrCreate()

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "your_kafka_topic"

# Define starting offsets and ending offsets for Kafka topics
starting_offsets = {
    "your_kafka_topic": {"0": 0, "1": 0, "2": 98},
    "another_kafka_topic": {"0": -2}
}

# Define starting offsets for single Kafka topic
starting_offsets = {
    "your_kafka_topic": {"0": 0, "1": 0, "2": 98}
}

# Subscribe to Kafka topics with specified starting offsets
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", str(starting_offsets)) \
    .load()

In [None]:
# # Get information about the Kafka topic, specifically the number of partitions

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaStructuredStreamingExample") \
    .getOrCreate()

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "your_kafka_topic"

# Get information about the Kafka topic, specifically the number of partitions
kafka_partitions = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load() \
    .selectExpr("CAST(partition AS INT)") \
    .distinct() \
    .collect()

# Extract the number of partitions from the result
num_partitions = len(kafka_partitions)

# Define starting offsets for Kafka topic
starting_offsets = {kafka_topic: {str(partition): 0 for partition in range(num_partitions)}}

# Define ending offsets for Kafka topic (setting to -1 for continuous streaming)
ending_offsets = {kafka_topic: {str(partition): -1 for partition in range(num_partitions)}}

# Subscribe to Kafka topic with specified starting and ending offsets for all partitions
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", str(starting_offsets)) \
    .option("endingOffsets", str(ending_offsets)) \
    .load()

# Cast the value column as String (assuming the messages are in String format)
df = df.selectExpr("CAST(value AS STRING)")

# Perform further transformations or analysis
# For example, you can split the value into words
words = df.selectExpr("value", "explode(split(value, ' ')) as word")

# Display the streaming results to the console
query = words \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
# Subscribe to a pattern
# subscribe to all topics that match with regular expression 'kontext.*'. It reads all events in all partitions. 
# Option subscribePattern is used to match against topic names. 

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [None]:
""" 
Kafka source for streaming queries
To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required.  Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names
"""

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", kafka_topic) \
    .load()


# Parse value from binay to string
json_df =  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Extract key and value columns from the Kafka message
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


''' 
We cannot directly show the above data frame as it uses a streaming source. The following exception will occur if you call df.show():
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

so use this data frame to write into another different topic. 

'''

# Write Spark data frame to Kafka topic
# For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic.

# Display the data in the console
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

    # .option("topic", "topic-kafka-3") \
    # .option("checkpointLocation", "file:///F:/tmp/topic-kafka/checkpoint") \
    # .start()

spark.streams.awaitAnyTermination()

# Wait for the stream to finish
query.awaitTermination()

In [None]:
''' 
StreamingContext is the main entry point for creating and managing a Spark Streaming application in PySpark. It represents the connection to a Spark cluster and can be used to create DStreams (Discretized Streams), which are the fundamental abstraction in Spark Streaming for processing live data.

'''

In [None]:
""" 
In Apache Spark Streaming, a checkpoint is a mechanism to provide fault-tolerance and recovery of streaming applications. It involves saving the state of the streaming application to a reliable distributed file system (like HDFS) periodically. This checkpoint information includes the metadata and data necessary for recovery in case of a failure.
"""

In [None]:

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Initialize Spark session
spark = SparkSession.builder.appName("StreamingContextExample").getOrCreate()

# Create a StreamingContext with a batch interval of 5 seconds
# The batch interval determines the duration of each micro-batch, which is the unit of data processing in Spark Streaming.
ssc = StreamingContext(spark.sparkContext, batchDuration=5)

# Checkpoint directory is set for fault-tolerance
ssc.checkpoint("/path/to/checkpoint_directory")


# Define DStream from a socket source , # Define the input DStream by connecting to a socket (for example)
lines = ssc.socketTextStream("localhost", 9999)


# Perform a simple transformation (e.g., word count)
word_counts = lines.flatMap(lambda line: line.split(" ")).countByValue()

# Print the results to the console
word_counts.pprint()

# Start the streaming context
ssc.start()

# Wait for the streaming to finish
ssc.awaitTermination()