In [None]:
# installing pyspark in colab (so i can use spark)
!pip -q install pyspark

# starting a spark session (this is like the entry point for pyspark)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("vehicle_co2_task1") \
    .getOrCreate()

spark

In [None]:
# uploading the dataset from my computer to colab
from google.colab import files

uploaded = files.upload()

uploaded  # just to see the file name i uploaded


In [None]:
# reading the csv with spark
# header=True means first row has column names
# inferSchema=True detect automatically spark data TYPES (NOT ALL STRINGS treated)
# sep="," because it is a normal csv with commas
file_name = "co2.csv"

df_raw = spark.read.csv(
    file_name,
    header=True,
    inferSchema=True,
    sep=","
)

print("rows in raw df:", df_raw.count())
print("columns:", len(df_raw.columns))
df_raw.printSchema() # null values in rows allowed? => data cleanning
df_raw.show(5, truncate=False) # VEHICLE CLASS: engine ~ fuel ~ CO2 emission


In [None]:
# renaming columns to simpler names (lowercase, snake_case, no u or simbs)
# this makes the code easier for me later

new_cols = {
    "Make": "make",
    "Model": "model",
    "Vehicle Class": "vehicle_class",
    "Engine Size(L)": "engine_size_l",
    "Cylinders": "cylinders",
    "Transmission": "transmission",
    "Fuel Type": "fuel_type",
    "Fuel Consumption City (L/100 km)": "fuel_city_l100",
    "Fuel Consumption Hwy (L/100 km)": "fuel_hwy_l100",
    "Fuel Consumption Comb (L/100 km)": "fuel_comb_l100",
    "Fuel Consumption Comb (mpg)": "fuel_comb_mpg",
    "CO2 Emissions(g/km)": "co2_g_km"
}

df = df_raw
for old, new in new_cols.items():
    if old in df.columns:
        df = df.withColumnRenamed(old, new)

df.printSchema()
df.show(5, truncate=False)



In [None]:
# checking MISSING VALUES per column
# i do a simple count of nulls because missing data can affect analysis and recommendations
# import

from pyspark.sql.functions import col, sum as spark_sum, when

null_counts = df.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) # names
    for c in df.columns
])

null_counts.show(truncate=False)


In [None]:
# cleaning the dataset
# i keep rows that have the main fields i need: make, model, vehicle class, fuel type and co2
# because missing these would make the recommender unreliable
# trim cleannign pre & post STRING SPACES => figure strings as same not because spaces diffs

from pyspark.sql.functions import trim

df_clean = df \
    .withColumn("make", trim(col("make"))) \
    .withColumn("model", trim(col("model"))) \
    .withColumn("vehicle_class", trim(col("vehicle_class"))) \
    .withColumn("fuel_type", trim(col("fuel_type"))) \
    .filter(col("make").isNotNull()) \
    .filter(col("model").isNotNull()) \
    .filter(col("vehicle_class").isNotNull()) \
    .filter(col("fuel_type").isNotNull()) \
    .filter(col("co2_g_km").isNotNull())

print("rows before:", df.count())
print("rows after cleaning:", df_clean.count())
df_clean.show(5, truncate=False)


In [None]:
# removing DUPLICATES => content (x2 could cause LESS SIMILARITY)
# some cars may appear more than once, so i drop duplicates based on key columns => line x car

key_cols = [c for c in ["make", "model", "vehicle_class", "engine_size_l", "transmission", "fuel_type"] if c in df_clean.columns]
df_clean = df_clean.dropDuplicates(key_cols)

print("rows after dropDuplicates:", df_clean.count())


EXPLORATORY ANALYSIS ~~~~~~~~~~~~~~

In [None]:
# SUMMARY OF STATISTICS
# this helps me understand the distribution and if there are extreme values
# realistic values, max SUVS & compact mins, syddev high variability

df_clean.select("co2_g_km").describe().show()


In [None]:
# cars LOWEST co2 emissions => HYBRID (fuel_type X), range emissions, smaller engine
df_clean.select("make", "model", "vehicle_class", "fuel_type", "engine_size_l", "co2_g_km") \
    .orderBy(col("co2_g_km").asc()).show(10, truncate=False)

# cars HIGHEST co2 emissions -> 2 SEATER, SUV & VANS, bigger engines and range emissions
df_clean.select("make", "model", "vehicle_class", "fuel_type", "engine_size_l", "co2_g_km") \
    .orderBy(col("co2_g_km").desc()).show(10, truncate=False)
# CO2 RELATION ~ engine size, fuel type & class?

In [None]:
# checking how co2 emissions change depending on fuel type
# i use avg() & count() => typical values & nº cars per group
# interp: per line, diff type fuel (focus nº cars, + trad fuel, more avg co2)
# hybrid x (-co2) & petrol z relevant (+ cars, +co2), ethanol e ok minus (co2 medium), N only 1 (no representative)

from pyspark.sql.functions import avg, count

df_clean.groupBy("fuel_type") \
    .agg(
        count("*").alias("n_cars"),
        avg("co2_g_km").alias("avg_co2")
    ) \
    .orderBy(col("avg_co2").asc()).show(truncate=False)


In [None]:
# checking co2 emissions by vehicle class: like suv, compact, etc...
# this can show trends like bigger cars = higher co2
# - OC2 avg (station wagon, compact & mid-size), - (suv, van, pick up)

df_clean.groupBy("vehicle_class") \
    .agg(
        count("*").alias("n_cars"),
        avg("co2_g_km").alias("avg_co2")
    ) \
    .orderBy(col("avg_co2").asc()) \
    .show(20, truncate=False)


In [None]:
# looking at engine size vs co2 in a simple way
# i create basic bins(ranges) to see if bigger engines have higher co2 on average per bin
# + size engine, + co2

from pyspark.sql.functions import when

df_bins = df_clean.withColumn( #create col
    "engine_bin", #name
    when(col("engine_size_l") < 2.0, "<2.0") \
    .when((col("engine_size_l") >= 2.0) & (col("engine_size_l") < 3.0), "2.0-2.9") \
    .when((col("engine_size_l") >= 3.0) & (col("engine_size_l") < 4.0), "3.0-3.9") \
    .otherwise(">=4.0")
)

df_bins.groupBy("engine_bin") \
    .agg(
        count("*").alias("n_cars"),
        avg("co2_g_km").alias("avg_co2")
    ) \
    .orderBy("engine_bin") \
    .show(truncate=False)


RECOMENDATION SYSTEM ~~~~~~~~~~~~~~

In [None]:
# creating a simple ID per car & readable name
# this helps me show recommendations more clearly
# unify strings import & id unique increasing spark
from pyspark.sql.functions import concat_ws, monotonically_increasing_id

df_rec = df_clean.withColumn(
    "car_name",
    concat_ws(" ", col("make"), col("model"))
).withColumn(
    "car_id",
    monotonically_increasing_id()
)

df_rec.select("car_id", "car_name", "vehicle_class", "fuel_type", "engine_size_l", "co2_g_km") \
    .show(5, truncate=False)


In [None]:
# building FEATURES 4 SIMILARITY
# i use onehot for categorical columns & scale numeric columns
# then i create one final vector called "features"

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

cat_cols = ["vehicle_class", "fuel_type", "transmission"]
cat_cols = [c for c in cat_cols if c in df_rec.columns]

num_cols = ["engine_size_l", "cylinders", "fuel_city_l100", "fuel_hwy_l100", "fuel_comb_l100", "fuel_comb_mpg", "co2_g_km"]
num_cols = [c for c in num_cols if c in df_rec.columns]

indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_ohe") for c in cat_cols]

assembler = VectorAssembler(
    inputCols=[c+"_ohe" for c in cat_cols] + num_cols,
    outputCol="raw_features"
)

scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)

pipe = Pipeline(stages=indexers + encoders + [assembler, scaler])

model_feat = pipe.fit(df_rec)
df_feat = model_feat.transform(df_rec)

df_feat.select("car_id", "car_name", "features").show(3, truncate=False)


In [None]:
# using locality seneitive hashing to find SIMILARITY CARS based on the FEATURE VECTPR
# this is an approximate nearest neighbor method, so it is fast on bigger datasets
# import: lsh algorytm knn lsh with numeric vector wok & EUCLIDEAN DISTANCE

from pyspark.ml.feature import BucketedRandomProjectionLSH

lsh = BucketedRandomProjectionLSH(
    inputCol="features",
    outputCol="hashes", # internal hash cols, group similar vectors
    bucketLength=2.0, # stricti similarity: medium value, + value + prec - neighbours
    numHashTables=3 # times repeating hashing (+ quality): medium, + tables + quality less computing power
)

lsh_model = lsh.fit(df_feat)


In [None]:
# selecting 1 CAR to RECOMEND SIMILAR CARS to this one
# i choose by searching a word in the car_name (example: "CIVIC" or "COROLLA")
# interp: reference car recommended system will look similac vehicles with similar chars & co2 emissions

target_word = "CIVIC"   # the model I choose to test yetsss (EASY TO CHANGE)

target_df = df_feat.filter(col("car_name").contains(target_word)).limit(1) # 1 car, one only reference vector needed

target_df.select("car_id", "car_name", "vehicle_class", "fuel_type", "co2_g_km").show(truncate=False) #car with fields


In [None]:

# finding NEAREST NEIGHBOURS to the selected car
# the result includes a DISTANCE COL (minus distance = more similar)
# recs expl:  df cars & vectors, taget car reference, 10 similar cars to search

if target_df.count() == 0: # found car?
    print("no car found with that word ops")
else:
    recs = lsh_model.approxNearestNeighbors(df_feat, target_df.first()["features"], 10) # target = obj type row; hash requires numerical vector (features col)

    recs.select(
        "car_id", "car_name", "vehicle_class", "fuel_type",
        "engine_size_l", "co2_g_km", "distCol"
    ).show(truncate=False)


EVALUATION ~~~~~~~~

In [None]:
# evaluating an idea for example :D (evaluations are logic & system is consistent of lsh?)
# if cars are really "similar", their co2 values should not be extremely different
# so i be doing min/max co2 inside the recommended list
# interp: cloase co2 values recommended, small range & low std => SIMILAR EMISSION LEVELS

if target_df.count() != 0: # yes car objective
    recs_small = recs.select("co2_g_km")
    recs_small.describe().show()
