### Stock Market Data processing using Spark Structured Streaming

#### Set up environment variables

In [0]:
import os

# Set environment variables
os.environ["KAFKA_BROKER"] = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
os.environ["KAFKA_TOPIC"] = "market_data_v4"
os.environ["KAFKA_TOPIC_PROCESSED"] = "processed_data_v4"
os.environ["KAFKA_API_KEY"] = "VSA47PB5RUOJ5EE6"
os.environ["KAFKA_API_SECRET"] = "zNZj95TZDIjX5/s3eh4sSmSu/4P/yXS3xmEzN9LR5B0kbSftUbrqzC6DJ6GVnO3l"

#### Stream processing

In [0]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, avg
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Stock Price Streaming") \
    .getOrCreate()

# Kafka configuration
kafka_bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
# os.getenv("KAFKA_BROKER")
kafka_topic = "market_data_v4"
# os.getenv("KAFKA_TOPIC")
kafka_topic_processed = "processed_data_v4"
# os.getenv("KAFKA_TOPIC_PROCESSED")
checkpoint_location = "/mnt/checkpoint/kafka_sink_v4"

kafka_config = {
    'kafka.bootstrap.servers': kafka_bootstrap_servers,
    'subscribe': kafka_topic,
    'startingOffsets': 'earliest',  # Start from the earliest message
    'kafka.security.protocol': 'SASL_SSL',
    'kafka.sasl.mechanism': 'PLAIN',
    "failOnDataLoss": "false",
    "kafka.ssl.endpoint.identification.algorithm": "https",
    'kafka.sasl.jaas.config': f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{os.getenv("KAFKA_API_KEY")}" password="{os.getenv("KAFKA_API_SECRET")}";',
}

# Define schema for incoming data
schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("ticker", StringType(), True)
])

# Read data from Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .options(**kafka_config) \
    .load()

# Parse JSON data
parsed_stream = raw_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Repartition the stream to ensure all data for the same ticker is processed in a single executor
parsed_stream = parsed_stream.repartition("ticker")


# Transformation: Calculate average price over a 1-minute window
average_price_stream = parsed_stream.withColumn("timestamp", col("timestamp").cast("timestamp")).withWatermark("timestamp", "2 minutes").groupBy(window(col("timestamp"), "1 minute"), col("ticker")).agg(avg("price").alias("average_price"))


# Prepare DataFrame for Kafka
kafka_df = average_price_stream.selectExpr(
    "CAST(ticker AS STRING) AS key",
    "to_json(struct(window.start AS window_start, window.end AS window_end, ticker, average_price)) AS value"
)

# Write the DataFrame to Kafka
query = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", kafka_topic_processed) \
    .option("checkpointLocation", checkpoint_location) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{os.getenv("KAFKA_API_KEY")}" password="{os.getenv("KAFKA_API_SECRET")}";')\
    .outputMode("update") \
    .start()



#### Live Dashboard on Databricks

In [0]:
average_price_stream_formated = average_price_stream \
    .withColumn("window.start", col("window.start").cast("timestamp")) 


In [0]:
display(average_price_stream_formated)

window,ticker,average_price,window.start
"List(2024-12-21T08:45:00Z, 2024-12-21T08:46:00Z)",IBM,87.25333340962727,2024-12-21T08:45:00Z
"List(2024-12-21T08:27:00Z, 2024-12-21T08:28:00Z)",IBM,107.16249974568684,2024-12-21T08:27:00Z
"List(2024-12-21T08:31:00Z, 2024-12-21T08:32:00Z)",IBM,117.46333249409992,2024-12-21T08:31:00Z
"List(2024-12-21T09:05:00Z, 2024-12-21T09:06:00Z)",IBM,80.43333307902019,2024-12-21T09:05:00Z
"List(2024-12-21T08:33:00Z, 2024-12-21T08:34:00Z)",IBM,90.40333239237468,2024-12-21T08:33:00Z
"List(2024-12-21T08:38:00Z, 2024-12-21T08:39:00Z)",IBM,94.07249959309895,2024-12-21T08:38:00Z
"List(2024-12-21T09:00:00Z, 2024-12-21T09:01:00Z)",IBM,81.95499992370605,2024-12-21T09:00:00Z
"List(2024-12-21T08:36:00Z, 2024-12-21T08:37:00Z)",IBM,115.54166475931804,2024-12-21T08:36:00Z
"List(2024-12-21T08:52:00Z, 2024-12-21T08:53:00Z)",IBM,119.61750030517578,2024-12-21T08:52:00Z
"List(2024-12-21T08:28:00Z, 2024-12-21T08:29:00Z)",IBM,84.56333287556966,2024-12-21T08:28:00Z


Databricks visualization. Run in Databricks to view.