In [1]:
import os
from pyspark.sql import SparkSession

# make sure Java is visible to the Jupyter kernel
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"

spark = (
    SparkSession.builder
    .appName("StreamingExercise")
    .master("local[*]")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.1.0")
    .getOrCreate()
)

spark.version


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/08 18:37:21 WARN Utils: Your hostname, DESKTOP-FBRFLFL, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/08 18:37:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/or/pyspark/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/or/.ivy2.5.2/cache
The jars for the packages stored in: /home/or/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ad42bc0c-16cf-4bf4-855a-a1778feb6f37;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.0 in central
	found org.apache.kafka#kafka-clients;3.9.1 in central
	found org.lz4#

'4.1.0'

In [2]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

# schema that matches the CSV columns (as strings)
schema = StructType([
    StructField("VIN (1-10)", StringType()),
    StructField("County", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Postal Code", StringType()),
    StructField("Model Year", StringType()),
    StructField("Make", StringType()),
    StructField("Model", StringType()),
    StructField("Electric Vehicle Type", StringType()),
    StructField("Clean Alternative Fuel Vehicle (CAFV) Eligibility", StringType()),
    StructField("Electric Range", StringType()),
    StructField("Base MSRP", StringType()),
    StructField("Legislative District", StringType()),
    StructField("DOL Vehicle ID", StringType()),
    StructField("Vehicle Location", StringType()),
    StructField("Electric Utility", StringType()),
    StructField("2020 Census Tract", StringType()),
])

raw_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "wikimedia_topic_1")
    .option("startingOffsets", "earliest")
    .load()
)

json_df = raw_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

json_df


DataFrame[VIN (1-10): string, County: string, City: string, State: string, Postal Code: string, Model Year: string, Make: string, Model: string, Electric Vehicle Type: string, Clean Alternative Fuel Vehicle (CAFV) Eligibility: string, Electric Range: string, Base MSRP: string, Legislative District: string, DOL Vehicle ID: string, Vehicle Location: string, Electric Utility: string, 2020 Census Tract: string]

In [3]:
from pyspark.sql.functions import col, desc
from pyspark.sql.types import IntegerType

top3_2023 = (
    json_df
    .withColumn("Model Year", col("Model Year").cast(IntegerType()))
    .filter(col("Model Year") == 2023)
    .groupBy("City")
    .count()
    .orderBy(desc("count"))
    .limit(3)
)

query = (
    top3_2023.writeStream
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start()
)

query


26/01/08 18:39:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-92c842de-67a0-4c11-81ee-e19996fca76b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
26/01/08 18:39:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x796851fa1f40>

26/01/08 18:39:27 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+
|City    |count|
+--------+-----+
|Seattle |5610 |
|Bellevue|2088 |
|Tukwila |1773 |
+--------+-----+



In [6]:
query.stop()
