In [1]:
# check if pyspark works
import pyspark
print(pyspark.__version__)

3.5.1


In [2]:
# Load dataset using pyspark and clean it
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import isnan, when, count, col

# Initialize Spark session
spark = SparkSession.builder.appName("CarPricePrediction").getOrCreate()

# Load the dataset
data_path = r"./dataset/used_cars_data.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Show the first few rows of the dataframe
df.show(5)


+-----------------+------------+----+----------+----------+---------------+-----+--------+-----------------+--------------------+------------+----------+--------------------+----------------+-------------------+-----------+--------------+-----+-------------+----------------+--------------+-------------+----------------+---------+-------------+-------+--------------------+----------+--------------------+-----+------------+------+------+---------+--------+--------+-----------+-------------+----------+---------+--------------------+--------------------+----------+---------------+-------+---------------+-----------+------------------+-------+-------+--------------+-------------+------+-------------------+-----------+--------------------+------------+--------------------+------+------------+-----------------------+------------+--------------------+---------+-------+----+
|              vin|back_legroom| bed|bed_height|bed_length|      body_type|cabin|    city|city_fuel_economy|combine_fuel_

## Data Exploration and Preprocessing

Explore the data and preprocess it as needed.

In [3]:
def count_nulls(df):
    null_counts = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
    return null_counts


# Display the schema of the dataset
#df.printSchema()

# Display summary statistics
#df.describe().show()

# Drop unnecessary columns

# TODO drop the rows 
df = df.drop('bed', 'bed_height', 'bed_length', 'cabin', 'description', 'fleet', 'franchise_dealer', 'city_fuel_economy', 'highway_fuel_economy', 'salvage', 'savings_amount', 'franchise_make', 'combine_fuel_economy', 'daysonmarket', 'dealer_zip', 'is_certified', 'is_cpo', 'is_oemcpo', 'latitude', 'listing_color', 'listing_id', 'longitude', 'main_picture_url', 'owner_count', 'theft_title', 'torque', 'transmission_display', 'trimId', 'trim_name', 'vehicle_damage_category', 'wheel_system_display')

# Show rows number
#print(df.count())
# Display dataset description
#df.show()



In [4]:
# Assume null as false: 'frame_damaged', 'has_accidents', 'isCab', 'major_options' since usually if not indicates not exist
#count_nulls(df).show()
df = df.fillna({'frame_damaged': False, 'has_accidents': False, 'isCab': False, 'major_options': False})

#count_nulls(df).show()

In [5]:
# Handle missing values if any (example: drop rows with missing values)
df = df.dropna()

#print(df.count())
#count_nulls(df).show()

In [6]:
# Salva il dataset aggiornato
#df.write.csv('dataset_principale_aggiornato.csv', header=True)
df.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata.csv")

In [9]:
from pyspark.sql.functions import col, coalesce, first
#df.coalesce(1).write.csv("./mydf")

#df.repartition(1).write.csv("./mydf")
df.repartition(1).write.option("header", True).csv("project-1/results")

In [None]:
df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata.csv")

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, first

# Inizializza la sessione Spark
spark = SparkSession.builder.appName("FillMissingFuelEconomy").getOrCreate()

# Carica i dataset
dataset_principale = df
dataset_secondario = spark.read.csv('./dataset/vehicles.csv', header=True, inferSchema=True)

# Identifica i record con valori mancanti
missing_values_df = dataset_principale.filter(
    (col('highway_fuel_economy').isNull()) |
    (col('city_fuel_economy').isNull())
)

print(missing_values_df.count())

# Unione dinamica per riempire i valori mancanti
updated_df = df.alias("main").join(
    dataset_secondario.alias("sec"),
    (col("main.make_name") == col("sec.make")) &
    (col("main.model_name") == col("sec.model")) &
    (col("main.year") == col("sec.year")),
    how='left'
).select(
    "main.*",
    col("sec.highway08").alias("highway_fuel_economy"),
    col("sec.city08").alias("city_fuel_economy"),
    #"sec.model"  # Include the model from secondary dataset for grouping
)

updated_df.describe().show()



# Ferma la sessione Spark
#spark.stop()


234199


In [26]:
updated_df.show()
print(updated_df.count())

+-----------------+------------+---------------+--------+----------------+-------------------+-----------+--------------------+-------------+-------------+----------------+---------+-------------+-------+----------+--------------------+-----+------+--------+-----------+--------------------+----------+---------------+-------+------------------+------------------+-------+-------------+------+-------------------+------------+------------+---------+-------+----+--------------------+-----------------+
|              vin|back_legroom|      body_type|    city|engine_cylinders|engine_displacement|engine_type|      exterior_color|frame_damaged|front_legroom|fuel_tank_volume|fuel_type|has_accidents| height|horsepower|      interior_color|isCab|is_new|  length|listed_date|       major_options| make_name|maximum_seating|mileage|        model_name|             power|  price|seller_rating| sp_id|            sp_name|transmission|wheel_system|wheelbase|  width|year|highway_fuel_economy|city_fuel_econo

In [None]:
df = df.dropna()
count_nulls(df).show()

In [None]:
# Rimuovi i duplicati mantenendo solo il primo valore
final_df = updated_df.groupBy("make_name", "model_name", "year").agg(
    first("updated_highway_fuel_economy").alias("highway_fuel_economy"),
    first("updated_city_fuel_economy").alias("city_fuel_economy"),
)

final_df.describe().show()
# Unisci i record aggiornati con il dataset principale mantenendo i valori originali
final_df = dataset_principale.alias("original").join(
    final_df.alias("updated"),
    on=['make_name', 'model_name', 'year'],
    how='left'
).select(
    col("original.*"),
    coalesce(col("updated.highway_fuel_economy"), col("original.highway_fuel_economy")).alias("highway_fuel_economy"),
    coalesce(col("updated.city_fuel_economy"), col("original.city_fuel_economy")).alias("city_fuel_economy"),
)

# Salva il dataset aggiornato
final_df.write.csv('dataset_principale_aggiornato.csv', header=True)

In [None]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Carica i dataset
dataset_secondario = spark.read.csv('./dataset/vehicles.csv', header=True, inferSchema=True)

# Identifica i record con valori mancanti
missing_values_df = df.filter(
    (col('highway_fuel_economy').isNull()) |
    (col('city_fuel_economy').isNull())
)

# Aggiungi valori dal dataset secondario
updated_df = missing_values_df.alias("main").join(
    dataset_secondario.alias("sec"),
    (col("main.make_name") == col("sec.make")) &
    (col("main.model_name") == col("sec.model")) &
    (col("main.year") == col("sec.year")),
    how='left'
).select(
    "main.*",
    F.coalesce("highway_fuel_economy", "sec.highway08"),
    F.coalesce("city_fuel_economy", "sec.city08"),
)


updated_df.describe(80).show()
count_nulls(updated_df).show()




+-------+--------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+-----------------------------------------+-----------------------------------+
|summary|                 vin|      back_legroom|         body_type|               city|   city_fuel_econo

In [None]:
# Unisci i record aggiornati con il dataset principale
final_df = df.alias("original").join(
    updated_df.alias("updated"),
    on=['make_name', 'model_name', 'year'],
    how='left'
).select(
    "original.*",
    F.coalesce("updated.highway_fuel_economy", "original.highway_fuel_economy").alias("highway_fuel_economy"),
    F.coalesce("updated.city_fuel_economy", "original.city_fuel_economy").alias("city_fuel_economy"),
    F.coalesce("updated.combine_fuel_economy", "original.combine_fuel_economy").alias("combine_fuel_economy")
)
# Salva il dataset aggiornato
final_df.write.csv('dataset_principale_aggiornato.csv', header=True)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `updated`.`combine_fuel_economy` cannot be resolved. Did you mean one of the following? [`updated`.`city_fuel_economy`, `updated`.`highway_fuel_economy`, `updated`.`back_legroom`, `original`.`city_fuel_economy`, `updated`.`front_legroom`].;
'Project [make_name#69422, model_name#69425, year#69445, vin#69380, back_legroom#69381, body_type#69385, city#69387, city_fuel_economy#69388, engine_cylinders#69393, engine_displacement#69394, engine_type#69395, exterior_color#69396, frame_damaged#72705, front_legroom#69401, fuel_tank_volume#69402, fuel_type#69403, has_accidents#72706, height#69405, highway_fuel_economy#69406, horsepower#69407, interior_color#69408, isCab#72707, is_new#69412, length#69415, ... 18 more fields]
+- Project [make_name#69422, model_name#69425, year#69445, vin#69380, back_legroom#69381, body_type#69385, city#69387, city_fuel_economy#69388, engine_cylinders#69393, engine_displacement#69394, engine_type#69395, exterior_color#69396, frame_damaged#72705, front_legroom#69401, fuel_tank_volume#69402, fuel_type#69403, has_accidents#72706, height#69405, highway_fuel_economy#69406, horsepower#69407, interior_color#69408, isCab#72707, is_new#69412, length#69415, ... 53 more fields]
   +- Join LeftOuter, (((make_name#69422 = make_name#78857) AND (model_name#69425 = model_name#78860)) AND (year#69445 = year#78880))
      :- SubqueryAlias original
      :  +- Project [vin#69380, back_legroom#69381, body_type#69385, city#69387, city_fuel_economy#69388, engine_cylinders#69393, engine_displacement#69394, engine_type#69395, exterior_color#69396, coalesce(frame_damaged#69398, cast(false as string)) AS frame_damaged#72705, front_legroom#69401, fuel_tank_volume#69402, fuel_type#69403, coalesce(has_accidents#69404, cast(false as string)) AS has_accidents#72706, height#69405, highway_fuel_economy#69406, horsepower#69407, interior_color#69408, coalesce(isCab#69409, cast(false as string)) AS isCab#72707, is_new#69412, length#69415, listed_date#69416, coalesce(major_options#69421, cast(false as string)) AS major_options#72708, make_name#69422, ... 15 more fields]
      :     +- Project [vin#69380, back_legroom#69381, body_type#69385, city#69387, city_fuel_economy#69388, engine_cylinders#69393, engine_displacement#69394, engine_type#69395, exterior_color#69396, frame_damaged#69398, front_legroom#69401, fuel_tank_volume#69402, fuel_type#69403, has_accidents#69404, height#69405, highway_fuel_economy#69406, horsepower#69407, interior_color#69408, isCab#69409, is_new#69412, length#69415, listed_date#69416, major_options#69421, make_name#69422, ... 15 more fields]
      :        +- Relation [vin#69380,back_legroom#69381,bed#69382,bed_height#69383,bed_length#69384,body_type#69385,cabin#69386,city#69387,city_fuel_economy#69388,combine_fuel_economy#69389,daysonmarket#69390,dealer_zip#69391,description#69392,engine_cylinders#69393,engine_displacement#69394,engine_type#69395,exterior_color#69396,fleet#69397,frame_damaged#69398,franchise_dealer#69399,franchise_make#69400,front_legroom#69401,fuel_tank_volume#69402,fuel_type#69403,... 42 more fields] csv
      +- SubqueryAlias updated
         +- Project [vin#78815, back_legroom#78816, body_type#78820, city#78822, city_fuel_economy#78823, engine_cylinders#78828, engine_displacement#78829, engine_type#78830, exterior_color#78831, frame_damaged#78881, front_legroom#78836, fuel_tank_volume#78837, fuel_type#78838, has_accidents#78882, height#78840, highway_fuel_economy#78841, horsepower#78842, interior_color#78843, isCab#78883, is_new#78847, length#78850, listed_date#78851, major_options#78884, make_name#78857, ... 17 more fields]
            +- Join LeftOuter, (((make_name#78857 = make#73280) AND (model_name#78860 = model#73281)) AND (cast(year#78880 as int) = year#73297))
               :- SubqueryAlias main
               :  +- Filter (isnull(highway_fuel_economy#78841) OR isnull(city_fuel_economy#78823))
               :     +- Project [vin#78815, back_legroom#78816, body_type#78820, city#78822, city_fuel_economy#78823, engine_cylinders#78828, engine_displacement#78829, engine_type#78830, exterior_color#78831, coalesce(frame_damaged#78833, cast(false as string)) AS frame_damaged#78881, front_legroom#78836, fuel_tank_volume#78837, fuel_type#78838, coalesce(has_accidents#78839, cast(false as string)) AS has_accidents#78882, height#78840, highway_fuel_economy#78841, horsepower#78842, interior_color#78843, coalesce(isCab#78844, cast(false as string)) AS isCab#78883, is_new#78847, length#78850, listed_date#78851, coalesce(major_options#78856, cast(false as string)) AS major_options#78884, make_name#78857, ... 15 more fields]
               :        +- Project [vin#78815, back_legroom#78816, body_type#78820, city#78822, city_fuel_economy#78823, engine_cylinders#78828, engine_displacement#78829, engine_type#78830, exterior_color#78831, frame_damaged#78833, front_legroom#78836, fuel_tank_volume#78837, fuel_type#78838, has_accidents#78839, height#78840, highway_fuel_economy#78841, horsepower#78842, interior_color#78843, isCab#78844, is_new#78847, length#78850, listed_date#78851, major_options#78856, make_name#78857, ... 15 more fields]
               :           +- Relation [vin#78815,back_legroom#78816,bed#78817,bed_height#78818,bed_length#78819,body_type#78820,cabin#78821,city#78822,city_fuel_economy#78823,combine_fuel_economy#78824,daysonmarket#78825,dealer_zip#78826,description#78827,engine_cylinders#78828,engine_displacement#78829,engine_type#78830,exterior_color#78831,fleet#78832,frame_damaged#78833,franchise_dealer#78834,franchise_make#78835,front_legroom#78836,fuel_tank_volume#78837,fuel_type#78838,... 42 more fields] csv
               +- SubqueryAlias sec
                  +- Relation [barrels08#73234,barrelsA08#73235,charge120#73236,charge240#73237,city08#73238,city08U#73239,cityA08#73240,cityA08U#73241,cityCD#73242,cityE#73243,cityUF#73244,co2#73245,co2A#73246,co2TailpipeAGpm#73247,co2TailpipeGpm#73248,comb08#73249,comb08U#73250,combA08#73251,combA08U#73252,combE#73253,combinedCD#73254,combinedUF#73255,cylinders#73256,displ#73257,... 60 more fields] csv


In [None]:
# drop also daysonmarket and dealer_zip not related to our study ... more general
# replace fro, datasets: highway_fuel_economy and city_fuel_economy eventually combine_fuel_economy
# source: https://www.fueleconomy.gov/feg/download.shtml
#Assume null as false: 'frame_damaged' and 'has_accidents', isCab, major_options
# Duplicates: sp_id and sp_name (name and model id it's same data...different version maybe int it's better but not understandable)
# Display the number of missing values in each column
count_nulls(df).show()

# Handle missing values if any (example: drop rows with missing values)
df = df.dropna()

print(df.count())
count_nulls(df).show()

+---+------------+---------+----+-----------------+----------------+-------------------+-----------+--------------+-------------+-------------+----------------+---------+-------------+------+--------------------+----------+--------------+-----+------+------+-----------+-------------+---------+---------------+-------+----------+-----+-----+-------+--------------+-------------+-----+-------+------------+------------+---------+-----+----+
|vin|back_legroom|body_type|city|city_fuel_economy|engine_cylinders|engine_displacement|engine_type|exterior_color|frame_damaged|front_legroom|fuel_tank_volume|fuel_type|has_accidents|height|highway_fuel_economy|horsepower|interior_color|isCab|is_new|length|listed_date|major_options|make_name|maximum_seating|mileage|model_name|power|price|salvage|savings_amount|seller_rating|sp_id|sp_name|transmission|wheel_system|wheelbase|width|year|
+---+------------+---------+----+-----------------+----------------+-------------------+-----------+--------------+-----