In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("spark://spark-master:7077")
    .appName("UsedCarsBatch")
    .getOrCreate()
)

spark


In [2]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

bucket = "used-cars-bucket-asen"
path = "/home/jovyan/data/vehicles.csv"   # local path inside the container

df_raw = (
    spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(path)
)

df_raw.printSchema()
df_raw.show(5)




root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- posting_date: string (nu

In [3]:
current_year = 2025  

valid_states = [
    "AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA",
    "HI","ID","IL","IN","IA","KS","KY","LA","ME","MD",
    "MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ",
    "NM","NY","NC","ND","OH","OK","OR","PA","RI","SC",
    "SD","TN","TX","UT","VT","VA","WA","WV","WI","WY",
    "DC"
]

df_clean = (
    df_raw
      .select(
          "id",
          "url",
          "region",
          "state",
          "price",
          "year",
          "manufacturer",
          "model",
          "condition",
          "fuel",
          "odometer",
          "posting_date"
      )
      # cast numeric columns 
      .withColumn("price", F.col("price").cast("double"))
      .withColumn("year", F.col("year").cast("int"))
      .withColumn("odometer", F.col("odometer").cast("double"))

      # cleaning the variable state
      .withColumn("state", F.upper(F.trim(F.col("state"))))
      .withColumn(
          "state",
          F.when(F.col("state").isin(valid_states), F.col("state"))
      )
      .filter(F.col("state").isNotNull())
      # -----------------------

      # basic quality filters
      .filter(F.col("price").isNotNull() & (F.col("price") > 0) & (F.col("price") < 200000))
      .filter(F.col("year").isNotNull() & (F.col("year") >= 1980) & (F.col("year") <= current_year + 1))
      .filter(F.col("odometer").isNull() | (F.col("odometer") >= 0))
)

df_clean.show(5)
print("Clean row count:", df_clean.count())


+----------+--------------------+------+-----+-------+----+------------+--------------------+---------+----+--------+--------------------+
|        id|                 url|region|state|  price|year|manufacturer|               model|condition|fuel|odometer|        posting_date|
+----------+--------------------+------+-----+-------+----+------------+--------------------+---------+----+--------+--------------------+
|7316814884|https://auburn.cr...|auburn|   AL|33590.0|2014|         gmc|sierra 1500 crew ...|     good| gas| 57923.0|2021-05-04T12:31:...|
|7316814758|https://auburn.cr...|auburn|   AL|22590.0|2010|   chevrolet|      silverado 1500|     good| gas| 71229.0|2021-05-04T12:31:...|
|7316814989|https://auburn.cr...|auburn|   AL|39590.0|2020|   chevrolet| silverado 1500 crew|     good| gas| 19160.0|2021-05-04T12:31:...|
|7316743432|https://auburn.cr...|auburn|   AL|30990.0|2017|      toyota|tundra double cab sr|     good| gas| 41124.0|2021-05-04T10:41:...|
|7316356412|https://auburn.

In [4]:
# Define region groups
region_data = [
    ("NORTHEAST", ["ME","NH","VT","MA","RI","CT","NY","NJ","PA", "DC"]),
    ("SOUTHEAST", ["DE","MD","VA","WV","NC","SC","GA","FL","AL","MS","TN","KY"]),
    ("MIDWEST",   ["OH","MI","IN","IL","WI","MN","IA","MO","ND","SD","NE","KS"]),
    ("SOUTHWEST", ["TX","OK","NM","AZ","AR", "LA"]),
    ("OTHER", ["AK","HI"]),
    ("WEST",      ["CO","WY","MT","ID","UT","NV","CA","OR","WA"])
]

rows = [(state, region) for region, states in region_data for state in states]
region_df = spark.createDataFrame(rows, ["state", "region_group"])

df_enriched = df_clean.join(region_df, on="state", how="left")

df_enriched.select("state", "region", "region_group").show(10, truncate=False)
print("Enriched rows:", df_enriched.count())



+-----+------+------------+
|state|region|region_group|
+-----+------+------------+
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
|AL   |auburn|SOUTHEAST   |
+-----+------+------------+
only showing top 10 rows

Enriched rows: 331156


In [6]:
agg = (
    df_enriched
      .groupBy("region_group", "state", "manufacturer", "model", "year")
      .agg(
          F.count("*").alias("listing_count"),
          F.avg("price").alias("avg_price"),
          F.expr("percentile_approx(odometer, 0.5)").alias("median_odometer")
      )
)

agg.show(20, truncate=False)
print("Agg rows:", agg.count())



+------------+-----+------------+------------------------------+----+-------------+---------+---------------+
|region_group|state|manufacturer|model                         |year|listing_count|avg_price|median_odometer|
+------------+-----+------------+------------------------------+----+-------------+---------+---------------+
|MIDWEST     |MI   |NULL        |/ford focus se                |2009|1            |6250.0   |87000.0        |
|MIDWEST     |MI   |NULL        |00000                         |2021|1            |25.0     |240000.0       |
|MIDWEST     |MI   |NULL        |133000                        |2005|1            |2900.0   |133000.0       |
|MIDWEST     |MI   |NULL        |142000                        |2011|1            |11995.0  |142000.0       |
|MIDWEST     |MI   |NULL        |1982 corvette                 |1982|1            |9500.0   |56724.0        |
|MIDWEST     |MI   |NULL        |1985 Mustang GT               |1985|2            |16300.0  |47000.0        |
|MIDWEST  

In [7]:
# Ranking top 5 most common and top 5 most expensive car models per region group using window functions
w_price = Window.partitionBy("region_group").orderBy(F.col("avg_price").desc())

agg_ranked_price = agg.withColumn("rank_price", F.dense_rank().over(w_price))

top_expensive_models = (
    agg_ranked_price
      .filter(F.col("rank_price") <= 5)     # top 5 most expensive per region_group
      .orderBy("region_group", "rank_price", "avg_price")
)



# Ranking the top 5 most common models per region_group 

model_counts = (
    agg
      .groupBy("region_group", "model")
      .agg(F.sum("listing_count").alias("total_listings"))
      .filter(F.col("model").isNotNull())
)

w_freq = Window.partitionBy("region_group").orderBy(F.col("total_listings").desc())

top_common_models = (
    model_counts
      .withColumn("rank_freq", F.dense_rank().over(w_freq))
      .filter(F.col("rank_freq") <= 5)       # top 5 most common models per region_group
      .orderBy("region_group", "rank_freq", F.col("total_listings").desc())
)

top_common_models.show(50, truncate=False)
top_expensive_models.show(50, truncate=False)

+------------+--------------+--------------+---------+
|region_group|model         |total_listings|rank_freq|
+------------+--------------+--------------+---------+
|MIDWEST     |f-150         |1442          |1        |
|MIDWEST     |silverado 1500|1213          |2        |
|MIDWEST     |escape        |805           |3        |
|MIDWEST     |equinox       |712           |4        |
|MIDWEST     |1500          |698           |5        |
|NORTHEAST   |f-150         |654           |1        |
|NORTHEAST   |civic         |568           |2        |
|NORTHEAST   |wrangler      |547           |3        |
|NORTHEAST   |accord        |515           |4        |
|NORTHEAST   |camry         |477           |5        |
|OTHER       |f-150         |121           |1        |
|OTHER       |silverado     |90            |2        |
|OTHER       |silverado 1500|82            |3        |
|OTHER       |tacoma        |74            |4        |
|OTHER       |1500          |66            |5        |
|SOUTHEAST

In [8]:

# Save final datasets to Parquet


# 1. Save cleaned + enriched main dataset
df_enriched.write.mode("overwrite") \
    .parquet("/home/jovyan/data/used_cars_clean_enriched")

# 2. Save aggregated table
agg.write.mode("overwrite") \
    .parquet("/home/jovyan/data/used_cars_agg")

# 3. Save top 5 most expensive models per region
top_expensive_models.write.mode("overwrite") \
    .parquet("/home/jovyan/data/used_cars_top_expensive_models")

# 4. Save top 5 most common models per region
top_common_models.write.mode("overwrite") \
    .parquet("/home/jovyan/data/used_cars_top_common_models")
