Configuration for Kafka

In [1]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 pyspark-shell"

Imports

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

Create Spark Session

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
order_schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("amount", IntegerType()),
    StructField("status", StringType())
])

Kafka Stream Config

In [5]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "earliest") \
    .load()

In [6]:
kafka_df_string = kafka_df.selectExpr("CAST(value AS STRING)")

In [7]:
orders_df = kafka_df_string.select(from_json(col("value"), order_schema).alias("data")) \
    .select("data.*")

In [8]:
filtered_orders = orders_df.filter(col("status").isin("delivered", "pending"))

Start Streaming

In [22]:
query = filtered_orders.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "../StreamOutput/kafka_output/") \
    .option("checkpointLocation", "../StreamOutput/kafka_checkpoint") \
    .option("header", "true") \
    .start()

query.awaitTermination(timeout=90)

False