How to run:   
1. docker compose up  
2. docker exec -it cpts415-kafka-1 /kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
3. Run code on this notebook: http://localhost:8888
4. 'cd GUI' and run node app.js
5. The app will send messages to Kafka. View them here (docker exec -it cpts415-kafka-1 /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test)
6. View Spark dataframe here: (docker logs (pid of notebook) -f)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, lit, lead, concat, udf, from_json, expr
from pyspark.sql import Window
import random
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

In [None]:
spark = SparkSession.builder.master('spark://spark-master:7077').config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0').config("spark.executor.cores", "2").getOrCreate()

In [None]:
kafka_bootstrap_servers = "kafka:9092"
kafka_topic = "test"

# Define the schema for the JSON data
schema = StructType([
    StructField("markers", ArrayType(
        StructType([
            StructField("lat", FloatType(), nullable=False),
            StructField("lng", FloatType(), nullable=False)
        ])
    ))
])

# Read data from Kafka as a streaming DataFrame
streaming_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic).load() 
    #.option("startingOffsets", "latest") \


streaming_df = streaming_df.selectExpr("CAST(value AS STRING)")

# Deserialize the JSON data using the specified schema
streaming_df = streaming_df.select(from_json("value", schema).alias("data")).select("data.*")

# Extract individual fields from the array
streaming_df = streaming_df.select(
    col("markers").getItem(0).getItem("lat").alias("lat1"),
    col("markers").getItem(0).getItem("lng").alias("lng1"),
    col("markers").getItem(1).getItem("lat").alias("lat2"),
    col("markers").getItem(1).getItem("lng").alias("lng2")
)

# Display the streaming DataFrame to the console
query = streaming_df.writeStream.outputMode("append").format("console").start()

# Wait for the streaming query to finish
query.awaitTermination()

# Stop the Spark session
spark.stop()

In [None]:
spark.stop()