# Exe 2

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.appName("Task2-EV-Console").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", "ev_topic")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
)

text_df = df.selectExpr("CAST(value AS STRING) as value")

schema = StructType([
    StructField("VIN (1-10)", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
])

cars_df = (
    text_df
    .select(from_json(col("value"), schema).alias("data"))
    .select(
        col("data.`VIN (1-10)`").alias("vin"),
        col("data.City").alias("city"),
        col("data.State").alias("state"),
    )
)

q = (
    cars_df.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", 20)
    .option("checkpointLocation", "/tmp/ev_task2_onecell_checkpoint")
    .start()
)

q


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/16 17:35:12 WARN Utils: Your hostname, Jordan, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/16 17:35:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/lior/.venv3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/lior/.ivy2.5.2/cache
The jars for the packages stored in: /home/lior/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.apache.spark#spark-token-provider-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7a6f5acd-597f-48cf-8246-bfa186af315a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.1 in central


<pyspark.sql.streaming.query.StreamingQuery at 0x72f9a0131a10>

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+------------+-----+
|vin       |city        |state|
+----------+------------+-----+
|KM8K33AGXL|Seattle     |WA   |
|1C4RJYB61N|Bothell     |WA   |
|1C4RJYD61P|Yakima      |WA   |
|5YJ3E1EA7J|Kirkland    |WA   |
|WBY7Z8C5XJ|Olympia     |WA   |
|5YJ3E1EAXL|Marysville  |WA   |
|2C4RC1N77H|Kent        |WA   |
|5YJYGDEE3L|Woodinville |WA   |
|5YJ3E1EA1J|Coupeville  |WA   |
|7SAYGDEF0P|Bellevue    |WA   |
|5YJ3E1EA7J|Kirkland    |WA   |
|3FA6P0SU9G|Port Orchard|WA   |
|JTDKARFP9H|Port Orchard|WA   |
|5YJ3E1EB8K|Mukilteo    |WA   |
|5YJ3E1EA5K|Redmond     |WA   |
|3FA6P0SU0D|Rochester   |WA   |
|WA1VABGE4K|Seattle     |WA   |
|1N4AZ0CP6F|Seattle     |WA   |
|KNDCC3LD7K|Bremerton   |WA   |
|1N4AZ0CP1E|Poulsbo     |WA   |
+----------+------------+-----+
only showing top 20 rows


# Exe 3

In [2]:
from pyspark.sql.functions import col, from_json, count, desc
from pyspark.sql.types import StructType, StructField, StringType

schema_task3 = StructType([
    StructField("City", StringType(), True),
    StructField("Model Year", StringType(), True),
])

parsed = (
    text_df
    .select(from_json(col("value"), schema_task3).alias("data"))
    .select("data.*")
)

top3_2023 = (
    parsed
    .filter(col("Model Year") == "2023")
    .groupBy("City")
    .agg(count("*").alias("num_cars"))
    .orderBy(desc("num_cars"))
    .limit(3)
)

q3 = (
    top3_2023.writeStream
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .option("checkpointLocation", "/tmp/ev_task3_top3_2023_checkpoint")
    .start()
)

q3


26/01/16 17:37:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x72f9a01463d0>

26/01/16 17:37:52 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+--------+
|City    |num_cars|
+--------+--------+
|Seattle |5496    |
|Bellevue|2036    |
|Tukwila |1743    |
+--------+--------+

