In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, count, desc
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .appName("PySpark-jupyter") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.1.1") \
    .config("spark.sql.streaming.checkpointLocation", "./checkpoint") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/18 16:45:27 WARN Utils: Your hostname, omerslaptop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/18 16:45:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/omer2/.local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/omer2/.ivy2.5.2/cache
The jars for the packages stored in: /home/omer2/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a3f9f297-4d28-42ff-a9f2-9a9adbd709d0;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.1 in central
	found org.apache.kafka#kafka-clients;3.9.1 in central
	found org.lz4#l

In [2]:
schema = StructType([
    StructField("VIN", StringType(), True),
    StructField("County", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Model Year", StringType(), True),
    StructField("Make", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Electric Vehicle Type", StringType(), True)
])

In [14]:
from pyspark.sql.functions import from_json, col
KAFKA_BROKER_URL = "localhost:9092"
KAFKA_TOPIC = "electric_cars_topic"

# קריאה מקפקא
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER_URL) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

# המרת ה-Binary Value ל-JSON לפי ה-Schema שהגדרנו
cars_df = raw_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# הצגת הנתונים על המסך
query = cars_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

#query.awaitTermination()

26/01/18 17:03:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+---------+------------+-----+----------+--------+--------------+---------------------+
| VIN|   County|        City|State|Model Year|    Make|         Model|Electric Vehicle Type|
+----+---------+------------+-----+----------+--------+--------------+---------------------+
|NULL|     King|     Seattle|   WA|      2020| HYUNDAI|          KONA| Battery Electric ...|
|NULL|     King|     Bothell|   WA|      2022|    JEEP|GRAND CHEROKEE| Plug-in Hybrid El...|
|NULL|   Yakima|      Yakima|   WA|      2023|    JEEP|GRAND CHEROKEE| Plug-in Hybrid El...|
|NULL|     King|    Kirkland|   WA|      2018|   TESLA|       MODEL 3| Battery Electric ...|
|NULL| Thurston|     Olympia|   WA|      2018|     BMW|            I3| Plug-in Hybrid El...|
|NULL|Snohomish|  Marysville|   WA|      2020|   TESLA|       MODEL 3| Battery Electric ...|
|NULL|     King|        Kent|   WA|      2017|CHRYSLER|      PACIF

In [15]:
from pyspark.sql.functions import col, count, desc

# 1. סינון הנתונים לשנת 2023 בלבד
# הערה: וודא ששם העמודה ב-CSV שלך הוא אכן "Model Year"
df_2023 = cars_df.filter(col("Model Year") == "2023")

# 2. הקבצה לפי עיר וספירת מכוניות
city_counts = df_2023.groupby("City") \
    .agg(count("*").alias("EV_Count")) \
    .orderBy(desc("EV_Count")) \
    .limit(3) # אנחנו רוצים רק את הטופ 3

# 3. הפעלת הזרם והדפסת התוצאות המתעדכנות
query = city_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()



26/01/18 17:03:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/18 17:03:40 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+--------+
|City    |EV_Count|
+--------+--------+
|Bellevue|5       |
|Olympia |5       |
|Yakima  |4       |
+--------+--------+

