In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder \
    .appName("ElectricCarsConsumer") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.1.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("Spark Session Created Successfully")

:: loading settings :: url = jar:file:/root/.venv3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2.5.2/cache
The jars for the packages stored in: /root/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5873a59d-d2d5-4310-95ef-7ee04a6226be;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.0 in central
	found org.apache.kafka#kafka-clients;3.9.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.8 in central
	found org.slf4j#slf4j-api;2.0.17 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.2 in central
	found org.apache.hadoop#hadoop-client-api;3.4.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.mod

Spark Session Created Successfully


In [2]:
schema = StructType([
    StructField("VIN (1-10)", StringType(), True),
    StructField("County", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Model Year", StringType(), True),
    StructField("Make", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Electric Vehicle Type", StringType(), True),
    StructField("Electric Range", StringType(), True)
])

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "electric_vehicles") \
  .option("startingOffsets", "earliest") \
  .load()

In [3]:
json_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

clean_df = json_df.withColumn("Electric Range", col("Electric Range").cast("double"))

In [None]:
city_2023_analysis = clean_df.filter(col("Model Year") == "2023") \
    .groupBy("City") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(3)

# 2. Start the stream to the console so it prints while waiting
query = city_2023_analysis.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

print("Waiting for data from producer.py... This cell will stay active.")

query.awaitTermination()

Waiting for data from producer.py... This cell will stay active.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+
|    City|count|
+--------+-----+
| Seattle| 5610|
|Bellevue| 2088|
| Tukwila| 1773|
+--------+-----+

