In [5]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=a3de7705341697ad69688e877624a4bc26c3448af03572be84f1f7d100b033bf
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [6]:
import pandas as pd
import numpy


In [57]:
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark import SparkFiles
from pyspark.sql.window import Window


In [55]:
from pyspark.sql.functions import col, stddev, abs,lag
from pyspark.sql import SparkSession
from pyspark import SparkFiles

In [9]:
# Step 2: Create a Spark session
spark = SparkSession.builder.appName("CO2_Emission_Analysis").getOrCreate()




In [10]:
# Step 3: Load the CSV data from the given URL
url = "https://storage.googleapis.com/deb-evaluation-materials/vehicles.csv"
spark.sparkContext.addFile(url)  # Add the file to Spark's distributed cache
df = spark.read.csv("file://" + SparkFiles.get("vehicles.csv"), header=True, inferSchema=True)



In [11]:
df.show()

+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|            Make|              Model|Year|Engine Displacement|Cylinders|   Transmission|       Drivetrain|       Vehicle Class|Fuel Type| Fuel Barrels/Year|City MPG|Highway MPG|Combined MPG|CO2 Emission Grams/Mile|Fuel Cost/Year|
+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|      AM General|  DJ Po Vehicle 2WD|1984|                2.5|      4.0|Automatic 3-spd|    2-Wheel Drive|Special Purpose V...|  Regular|19.388823529411766|      18|         17|          17|      522.7647058823529|          1950|
|      AM General|   FJ8c Post Office|1984|                4.2|      6.0|Aut

In [12]:
df.schema

StructType([StructField('Make', StringType(), True), StructField('Model', StringType(), True), StructField('Year', IntegerType(), True), StructField('Engine Displacement', DoubleType(), True), StructField('Cylinders', DoubleType(), True), StructField('Transmission', StringType(), True), StructField('Drivetrain', StringType(), True), StructField('Vehicle Class', StringType(), True), StructField('Fuel Type', StringType(), True), StructField('Fuel Barrels/Year', DoubleType(), True), StructField('City MPG', IntegerType(), True), StructField('Highway MPG', IntegerType(), True), StructField('Combined MPG', IntegerType(), True), StructField('CO2 Emission Grams/Mile', DoubleType(), True), StructField('Fuel Cost/Year', IntegerType(), True)])

In [13]:
answer_dict =  {"Q1" : None,
                "Q2" : None,
                "Q3" : None,
                "Q4" : None,
                "Q5" : None,
                "Q6" : None,
                "Q7" : None}

##**Question 1**

In [14]:
# solution
vw_df = df.filter(df["make"] == "Volkswagen")
average_co2_emission = vw_df.select(avg("CO2 Emission Grams/Mile")).collect()[0][0]


In [15]:
print("Average CO2 Emission per gram/mile of all Volkswagen cars:", average_co2_emission)

Average CO2 Emission per gram/mile of all Volkswagen cars: 392.7417210857633


In [16]:
answer_dict["Q1"] = average_co2_emission

In [17]:
answer_dict

{'Q1': 392.7417210857633,
 'Q2': None,
 'Q3': None,
 'Q4': None,
 'Q5': None,
 'Q6': None,
 'Q7': None}

##**Question 2**

In [18]:
df.createOrReplaceTempView("carmodels")

In [19]:
answer_df = spark.sql("SELECT COUNT(DISTINCT model) as Top_val, Make FROM carmodels GROUP BY Make ORDER BY COUNT(DISTINCT model) DESC LIMIT 5")

In [20]:
answer_df.show()

+-------+-------------+
|Top_val|         Make|
+-------+-------------+
|    333|Mercedes-Benz|
|    284|          BMW|
|    253|    Chevrolet|
|    185|         Ford|
|    163|          GMC|
+-------+-------------+



In [21]:
top_brands_list = answer_df.collect()



In [36]:
list2 = answer_df.rdd.map(lambda x: x.Make).collect()

In [37]:
list1 = answer_df.rdd.map(lambda x: x.Top_val).collect()

In [39]:

combined_list = [[x, list1[i]] for i, x in enumerate(list2)]

print(combined_list)


[['Mercedes-Benz', 333], ['BMW', 284], ['Chevrolet', 253], ['Ford', 185], ['GMC', 163]]


In [40]:
answer_dict["Q2"] = combined_list

In [41]:
answer_dict

{'Q1': 392.7417210857633,
 'Q2': [['Mercedes-Benz', 333],
  ['BMW', 284],
  ['Chevrolet', 253],
  ['Ford', 185],
  ['GMC', 163]],
 'Q3': None,
 'Q4': None,
 'Q5': None,
 'Q6': None,
 'Q7': None}

##**Question 3**

In [42]:
df.show()

+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|            Make|              Model|Year|Engine Displacement|Cylinders|   Transmission|       Drivetrain|       Vehicle Class|Fuel Type| Fuel Barrels/Year|City MPG|Highway MPG|Combined MPG|CO2 Emission Grams/Mile|Fuel Cost/Year|
+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|      AM General|  DJ Po Vehicle 2WD|1984|                2.5|      4.0|Automatic 3-spd|    2-Wheel Drive|Special Purpose V...|  Regular|19.388823529411766|      18|         17|          17|      522.7647058823529|          1950|
|      AM General|   FJ8c Post Office|1984|                4.2|      6.0|Aut

In [43]:
distinct_fuels = df.select(col("Fuel Type")).distinct().orderBy(col("Fuel Type"))

distinct_fuels_list = distinct_fuels.rdd.map(lambda row: row[0]).collect()


In [49]:
print(distinct_fuels_list)

['CNG', 'Diesel', 'Gasoline or E85', 'Gasoline or natural gas', 'Gasoline or propane', 'Midgrade', 'Premium', 'Premium Gas or Electricity', 'Premium and Electricity', 'Premium or E85', 'Regular', 'Regular Gas and Electricity', 'Regular Gas or Electricity']


In [50]:
answer_dict["Q3"] = distinct_fuels_list

##**Question 4**

Q4. Show the 9 Toyota cars with the most extreme Fuel Barrels/Year in abosolute terms within all Toyota cars. Show the car Model, Year and their Fuel Barrels/Year in standard deviation units(Z-score) sorted in descending order by their Fuel Barrels/Year in absolute terms first and then by year in descending order BUT without modifying the negative values

In [46]:
df.show()

+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|            Make|              Model|Year|Engine Displacement|Cylinders|   Transmission|       Drivetrain|       Vehicle Class|Fuel Type| Fuel Barrels/Year|City MPG|Highway MPG|Combined MPG|CO2 Emission Grams/Mile|Fuel Cost/Year|
+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|      AM General|  DJ Po Vehicle 2WD|1984|                2.5|      4.0|Automatic 3-spd|    2-Wheel Drive|Special Purpose V...|  Regular|19.388823529411766|      18|         17|          17|      522.7647058823529|          1950|
|      AM General|   FJ8c Post Office|1984|                4.2|      6.0|Aut

In [52]:
toyota_cars = df.filter(col("Make") == "Toyota")
stddev_value = toyota_cars.select(stddev("Fuel Barrels/Year")).collect()[0][0]
toyota_cars_zscore = toyota_cars.withColumn("zscore", abs((col("Fuel Barrels/Year") - stddev_value) / stddev_value))
sorted_cars = toyota_cars_zscore.orderBy(col("zscore").desc(), col("Year").desc())
top_9_cars = sorted_cars.select("Model", "Year", "Fuel Barrels/Year").limit(9)
result_9cars = top_9_cars.rdd.map(lambda row: [row["Model"], row["Year"], row["Fuel Barrels/Year"]]).collect()


In [53]:
answer_dict["Q4"] = result_9cars

In [54]:
answer_dict

{'Q1': 392.7417210857633,
 'Q2': [['Mercedes-Benz', 333],
  ['BMW', 284],
  ['Chevrolet', 253],
  ['Ford', 185],
  ['GMC', 163]],
 'Q3': ['CNG',
  'Diesel',
  'Gasoline or E85',
  'Gasoline or natural gas',
  'Gasoline or propane',
  'Midgrade',
  'Premium',
  'Premium Gas or Electricity',
  'Premium and Electricity',
  'Premium or E85',
  'Regular',
  'Regular Gas and Electricity',
  'Regular Gas or Electricity'],
 'Q4': [['Cab/Chassis 2WD', 1993, 32.961],
  ['Cab/Chassis 2WD', 1992, 32.961],
  ['Cab/Chassis 2WD', 1991, 32.961],
  ['Cab/Chassis 2WD', 1990, 32.961],
  ['Cab/Chassis 2WD', 1989, 32.961],
  ['Cab/Chassis 2WD', 1993, 29.96454545454546],
  ['Cab/Chassis 2WD', 1992, 29.96454545454546],
  ['Land Cruiser Wagon 4WD', 1992, 29.96454545454546],
  ['Cab/Chassis 2WD', 1991, 29.96454545454546]],
 'Q5': None,
 'Q6': None,
 'Q7': None}

##**Question 5**

Q5. Calculate the changes in Combined MPG with their previous model of all Golf cars with Manual 5-spd transmission and Regular Fuel Type. Show the Year, the Combined MPG and the calculated difference of MPG in a list sorted by Year in ascending order.

In [56]:
df.show()

+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|            Make|              Model|Year|Engine Displacement|Cylinders|   Transmission|       Drivetrain|       Vehicle Class|Fuel Type| Fuel Barrels/Year|City MPG|Highway MPG|Combined MPG|CO2 Emission Grams/Mile|Fuel Cost/Year|
+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|      AM General|  DJ Po Vehicle 2WD|1984|                2.5|      4.0|Automatic 3-spd|    2-Wheel Drive|Special Purpose V...|  Regular|19.388823529411766|      18|         17|          17|      522.7647058823529|          1950|
|      AM General|   FJ8c Post Office|1984|                4.2|      6.0|Aut

In [58]:
filtered_df = df.filter((col("Model") == "Golf") &
                        (col("Transmission") == "Manual 5-spd") &
                        (col("Fuel Type") == "Regular"))

window_spec = Window.partitionBy("Make", "Model").orderBy("Year")
mpg_diff_df = filtered_df.withColumn("prev_mpg", lag("Combined MPG").over(window_spec))
mpg_diff_df = mpg_diff_df.withColumn("mpg_diff", col("Combined MPG") - col("prev_mpg"))



[1986, 26, None]
[1987, 26, 0]
[1988, 25, -1]
[1989, 25, 0]
[1999, 24, -1]
[2000, 24, 0]
[2001, 24, 0]
[2002, 24, 0]
[2003, 24, 0]
[2004, 24, 0]
[2005, 24, 0]
[2006, 24, 0]
[2010, 25, 1]
[2011, 26, 1]
[2012, 26, 0]
[2013, 26, 0]
[2015, 30, 4]
[2016, 30, 0]
[2017, 29, -1]


In [59]:
MPG_change_df = mpg_diff_df.select("Year", "Combined MPG", "mpg_diff").orderBy("year")
MPG_change_list = MPG_change_df.rdd.map(lambda row: [row["Year"], row["Combined MPG"], row["mpg_diff"]]).collect()

In [60]:
answer_dict["Q5"] = MPG_change_list

In [61]:
answer_dict

{'Q1': 392.7417210857633,
 'Q2': [['Mercedes-Benz', 333],
  ['BMW', 284],
  ['Chevrolet', 253],
  ['Ford', 185],
  ['GMC', 163]],
 'Q3': ['CNG',
  'Diesel',
  'Gasoline or E85',
  'Gasoline or natural gas',
  'Gasoline or propane',
  'Midgrade',
  'Premium',
  'Premium Gas or Electricity',
  'Premium and Electricity',
  'Premium or E85',
  'Regular',
  'Regular Gas and Electricity',
  'Regular Gas or Electricity'],
 'Q4': [['Cab/Chassis 2WD', 1993, 32.961],
  ['Cab/Chassis 2WD', 1992, 32.961],
  ['Cab/Chassis 2WD', 1991, 32.961],
  ['Cab/Chassis 2WD', 1990, 32.961],
  ['Cab/Chassis 2WD', 1989, 32.961],
  ['Cab/Chassis 2WD', 1993, 29.96454545454546],
  ['Cab/Chassis 2WD', 1992, 29.96454545454546],
  ['Land Cruiser Wagon 4WD', 1992, 29.96454545454546],
  ['Cab/Chassis 2WD', 1991, 29.96454545454546]],
 'Q5': [[1986, 26, None],
  [1987, 26, 0],
  [1988, 25, -1],
  [1989, 25, 0],
  [1999, 24, -1],
  [2000, 24, 0],
  [2001, 24, 0],
  [2002, 24, 0],
  [2003, 24, 0],
  [2004, 24, 0],
  [2005, 

##**Question 6**

In [63]:
df.show()

+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|            Make|              Model|Year|Engine Displacement|Cylinders|   Transmission|       Drivetrain|       Vehicle Class|Fuel Type| Fuel Barrels/Year|City MPG|Highway MPG|Combined MPG|CO2 Emission Grams/Mile|Fuel Cost/Year|
+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|      AM General|  DJ Po Vehicle 2WD|1984|                2.5|      4.0|Automatic 3-spd|    2-Wheel Drive|Special Purpose V...|  Regular|19.388823529411766|      18|         17|          17|      522.7647058823529|          1950|
|      AM General|   FJ8c Post Office|1984|                4.2|      6.0|Aut

In [66]:
brands = ["Toyota", "Ford", "Volkswagen", "Nissan", "Honda"]

co2_emmision_list = []
for brand in brands:
    brand_df = df.filter(col("Make") == brand).orderBy(col("CO2 Emission Grams/Mile")).limit(5)
    brand_emissions = brand_df.select("CO2 Emission Grams/Mile").collect()
    emissions_values = [row["CO2 Emission Grams/Mile"] for row in brand_emissions]
    brand_row = [brand] + emissions_values
    co2_emmision_list.append(brand_row)

# Step 5: Sort the result list in ascending order of CO2 Emission Grams/Mile values
co2_emmision_list = sorted(co2_emmision_list, key=lambda x: x[1:])



In [67]:
answer_dict["Q6"] = co2_emmision_list

## **Question7**

Q7. Form 7 groups of 5 years to calculated the median Combined MPG of each group. The first group is from 1984 to 1988, the second from 1989 to 1993 and so on. The last group will have years not appearing in the dataset.

> Indented block



In [68]:
df.show()

+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|            Make|              Model|Year|Engine Displacement|Cylinders|   Transmission|       Drivetrain|       Vehicle Class|Fuel Type| Fuel Barrels/Year|City MPG|Highway MPG|Combined MPG|CO2 Emission Grams/Mile|Fuel Cost/Year|
+----------------+-------------------+----+-------------------+---------+---------------+-----------------+--------------------+---------+------------------+--------+-----------+------------+-----------------------+--------------+
|      AM General|  DJ Po Vehicle 2WD|1984|                2.5|      4.0|Automatic 3-spd|    2-Wheel Drive|Special Purpose V...|  Regular|19.388823529411766|      18|         17|          17|      522.7647058823529|          1950|
|      AM General|   FJ8c Post Office|1984|                4.2|      6.0|Aut

In [83]:

year_range = list(range(1984, 2023, 5))

median_CMG_list = []
for start_year in year_range:
    end_year = start_year + 4
    group_df = df.filter((col("Year") >= start_year) & (col("year") <= end_year))
    median_mpg = group_df.approxQuantile("Combined MPG", [0.5], 0)
    median_CMG_list.append([(start_year, end_year), median_mpg])


In [84]:
answer_dict["Q7"] = median_CMG_list

In [86]:
answer_dict

{'Q1': 392.7417210857633,
 'Q2': [['Mercedes-Benz', 333],
  ['BMW', 284],
  ['Chevrolet', 253],
  ['Ford', 185],
  ['GMC', 163]],
 'Q3': ['CNG',
  'Diesel',
  'Gasoline or E85',
  'Gasoline or natural gas',
  'Gasoline or propane',
  'Midgrade',
  'Premium',
  'Premium Gas or Electricity',
  'Premium and Electricity',
  'Premium or E85',
  'Regular',
  'Regular Gas and Electricity',
  'Regular Gas or Electricity'],
 'Q4': [['Cab/Chassis 2WD', 1993, 32.961],
  ['Cab/Chassis 2WD', 1992, 32.961],
  ['Cab/Chassis 2WD', 1991, 32.961],
  ['Cab/Chassis 2WD', 1990, 32.961],
  ['Cab/Chassis 2WD', 1989, 32.961],
  ['Cab/Chassis 2WD', 1993, 29.96454545454546],
  ['Cab/Chassis 2WD', 1992, 29.96454545454546],
  ['Land Cruiser Wagon 4WD', 1992, 29.96454545454546],
  ['Cab/Chassis 2WD', 1991, 29.96454545454546]],
 'Q5': [[1986, 26, None],
  [1987, 26, 0],
  [1988, 25, -1],
  [1989, 25, 0],
  [1999, 24, -1],
  [2000, 24, 0],
  [2001, 24, 0],
  [2002, 24, 0],
  [2003, 24, 0],
  [2004, 24, 0],
  [2005, 

In [92]:
from os import path
import pickle

file_name = "OMOTOSHO_AYOMIDE_answers.pkl"
path = ""

with open(path+file_name, 'wb') as f:
    pickle.dump(answer_dict, f, protocol=pickle.HIGHEST_PROTOCOL)


SyntaxError: ignored

In [91]:
ls

OMOTOSHO_AYOMIDE_answers.pkl  [0m[01;34msample_data[0m/  [01;34mspark-warehouse[0m/
