In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a SparkSession
spark = SparkSession.builder.appName("Car Data Analysis").getOrCreate()

# Define the schema
schema = StructType([
    StructField("Unnamed: 0", StringType(), True),
    StructField("vin", StringType(), True),
    StructField("firstSeen", StringType(), True),
    StructField("lastSeen", StringType(), True),
    StructField("msrp", IntegerType(), True),
    StructField("askPrice", IntegerType(), True),
    StructField("mileage", IntegerType(), True),
    StructField("isNew", BooleanType(), True),
    StructField("brandName", StringType(), True),
    StructField("modelName", StringType(), True),
    StructField("dealerID", IntegerType(), True),
    StructField("vf_AirBagLocFront", StringType(), True),
    StructField("vf_AirBagLocSide", StringType(), True),
    StructField("vf_BodyClass", StringType(), True),
    StructField("vf_DisplacementCC", DoubleType(), True),
    StructField("vf_DisplacementCI", DoubleType(), True),
    StructField("vf_DisplacementL", DoubleType(), True),
    StructField("vf_Doors", IntegerType(), True),
    StructField("vf_EngineCylinders", IntegerType(), True),
    StructField("vf_FuelTypePrimary", StringType(), True),
    StructField("vf_Make", StringType(), True),
    StructField("vf_MakeID", IntegerType(), True),
    StructField("vf_Manufacturer", StringType(), True),
    StructField("vf_ManufacturerId", IntegerType(), True),
    StructField("vf_Model", StringType(), True),
    StructField("vf_ModelID", IntegerType(), True),
    StructField("vf_ModelYear", IntegerType(), True),
    StructField("vf_PlantCity", StringType(), True),
    StructField("vf_PlantCountry", StringType(), True),
    StructField("vf_PlantState", StringType(), True),
    StructField("vf_SeatBeltsAll", StringType(), True),
    StructField("vf_VIN", StringType(), True),
    StructField("vf_VehicleType", StringType(), True)
])


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Initialize SparkSession with Hive support
spark = SparkSession \
    .builder \
    .appName("Spark with Hive on EMR") \
    .config("hive.metastore.uris", "thrift://xxxxxxxx:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# Show databases
spark.sql("SHOW DATABASES").show()

# Create a DataFrame from a fully qualified table name
df = spark.table("my_database.my_table")

# Show the DataFrame
df.show()


In [2]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("dateFormat", "yyyy-MM-dd") \
    .schema(schema) \
    .load("finaldatasettttttttt.csv")

In [3]:
df.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- firstSeen: string (nullable = true)
 |-- lastSeen: string (nullable = true)
 |-- msrp: integer (nullable = true)
 |-- askPrice: integer (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- isNew: boolean (nullable = true)
 |-- brandName: string (nullable = true)
 |-- modelName: string (nullable = true)
 |-- dealerID: integer (nullable = true)
 |-- vf_AirBagLocFront: string (nullable = true)
 |-- vf_AirBagLocSide: string (nullable = true)
 |-- vf_BodyClass: string (nullable = true)
 |-- vf_DisplacementCC: double (nullable = true)
 |-- vf_DisplacementCI: double (nullable = true)
 |-- vf_DisplacementL: double (nullable = true)
 |-- vf_Doors: integer (nullable = true)
 |-- vf_EngineCylinders: integer (nullable = true)
 |-- vf_FuelTypePrimary: string (nullable = true)
 |-- vf_Make: string (nullable = true)
 |-- vf_MakeID: integer (nullable = true)
 |-- vf_Manufacturer: string (nullable = true)
 

In [26]:
avg_displacement_df = df.groupBy("vf_FuelTypePrimary") \
    .agg(
        avg("msrp").alias("Manfacture price ")
    ) \
    .orderBy("vf_FuelTypePrimary")

avg_displacement_df.show()


+--------------------+------------------+
|  vf_FuelTypePrimary| Manfacture price |
+--------------------+------------------+
|Compressed Natura...|           52988.0|
|              Diesel| 53868.92971246006|
|            Electric| 26795.82932166302|
|Flexible Fuel Veh...|28850.899731423455|
|            Gasoline| 26629.31402034039|
|Liquefied Petrole...|           54880.0|
+--------------------+------------------+



In [27]:
cars_by_manufacturer = df.groupBy("vf_Manufacturer").count()
print("Number of cars by manufacturer:")
cars_by_manufacturer.show()

Number of cars by manufacturer:
+--------------------+-----+
|     vf_Manufacturer|count|
+--------------------+-----+
|HONDA MFG. INDIAN...|  244|
|   ALFA ROMEO S.P.A.|  851|
|AUTO ALLIANCE INT...|  212|
|FORD MOTOR COMPAN...| 3212|
|HONDA OF THE U.K....|   32|
|RENAULT SAMSUNG M...|   49|
|TOYOTA MOTOR NORT...|  777|
|FUJI HEAVY INDUST...|  457|
|     FCA CANADA INC.| 4734|
|MAZDA MOTOR MANUF...|  185|
|MAZDA MOTOR CORPO...|  307|
|NEW UNITED MOTOR ...|   74|
|              BMW AG|   12|
|  GENERAL MOTORS LLC|46290|
|CHRYSLER DE MEXIC...| 1639|
|HONDA OF AMERICA ...| 3085|
|TOYOTA MOTOR MANU...|  263|
|                AUDI| 3977|
|  MERCEDES-BENZ CARS|   38|
|FORD MOTOR COMPAN...|14224|
+--------------------+-----+
only showing top 20 rows



In [17]:
##Average price by model year
avg_price_by_model_year = df.groupBy("vf_ModelYear").agg({"askPrice": "avg"}).orderBy("vf_ModelYear")
print("Average price by model year:")
avg_price_by_model_year.show()

Average price by model year:
+------------+------------------+
|vf_ModelYear|     avg(askPrice)|
+------------+------------------+
|        1998|               0.0|
|        1999|1090.7272727272727|
|        2000|1022.2068965517242|
|        2001| 2385.904761904762|
|        2002| 2597.990909090909|
|        2003|1712.2463768115942|
|        2004|1895.4966887417218|
|        2005|2614.2580645161293|
|        2006|3108.5838926174497|
|        2007| 4396.276635514018|
|        2008|3940.7547408343867|
|        2009| 4973.787786259542|
|        2010| 7486.628922237381|
|        2011| 9208.490215264188|
|        2012| 9416.134934497817|
|        2013|         11448.506|
|        2014|14361.985345997286|
|        2015|17672.901679271887|
|        2016|19693.426313541222|
|        2017|20210.724539614563|
+------------+------------------+
only showing top 20 rows



In [28]:
###Find the top 5 manufacturers (vf_Manufacturer) with the highest average askPrice:
from pyspark.sql.functions import desc
top_manufacturers = df.groupBy("vf_Manufacturer").agg(avg("askPrice").alias("avg_askPrice")) \
                      .orderBy(desc("avg_askPrice")).limit(5)
top_manufacturers.show()


+--------------------+------------------+
|     vf_Manufacturer|      avg_askPrice|
+--------------------+------------------+
|KARMA AUTOMOTIVE LLC|           79980.0|
|                AUDI|49551.877294443046|
|   ALFA ROMEO S.P.A.| 47563.20564042303|
|MERCEDES-BENZ OF ...|37575.608695652176|
|       FORD WERKE AG|29354.846153846152|
+--------------------+------------------+



In [29]:
####Determine the brandName and modelName of vehicles with the highest askPrice within each vf_VehicleType:
max_price_per_type = df.groupBy("vf_VehicleType").agg(max("askPrice").alias("max_askPrice"))
result = df.join(max_price_per_type, "vf_VehicleType").filter(col("askPrice") == col("max_askPrice")) \
           .select("brandName", "modelName", "vf_VehicleType", "askPrice")
result.show()

+---------+---------+--------------------+--------+
|brandName|modelName|      vf_VehicleType|askPrice|
+---------+---------+--------------------+--------+
|CHEVROLET|Silverado|  INCOMPLETE VEHICLE|   40469|
|CHEVROLET|Silverado|  INCOMPLETE VEHICLE|   40469|
|CHEVROLET|Silverado|  INCOMPLETE VEHICLE|   40469|
|     FORD| Explorer|MULTIPURPOSE PASS...|  383747|
|     FORD|   Fiesta|       PASSENGER CAR|  174460|
|     FORD| Explorer|MULTIPURPOSE PASS...|  383747|
|     FORD|   Fiesta|       PASSENGER CAR|  174460|
|CHEVROLET|  Express|                 BUS|   22998|
|CHEVROLET|  Express|                 BUS|   22998|
|      GMC|   Sierra|              TRUCK |   80145|
+---------+---------+--------------------+--------+



In [22]:
##Calculate the average askPrice for vehicles manufactured by each vf_Manufacturer, 
# but only for vehicles with mileage between 20000 and 50000
average_price_filtered = df.filter((df["mileage"] >= 20000) & (df["mileage"] <= 50000)) \
                           .groupBy("vf_Manufacturer") \
                           .agg(avg("askPrice").alias("avg_askPrice")) \
                           .orderBy("avg_askPrice")
average_price_filtered.show()


+--------------------+------------------+
|     vf_Manufacturer|      avg_askPrice|
+--------------------+------------------+
|NEW UNITED MOTOR ...|            6695.0|
|NISSAN MEXICANA S...|12288.238095238095|
|   VOLVO CAR USA LLC|           12747.5|
|TOYOTA MOTOR CORP...|           13183.4|
|MAZDA MOTOR MANUF...|13250.035087719298|
| HONDA MOTOR CO. LTD|15486.461538461539|
|HONDA MFG. INDIAN...|       15674.59375|
|TOYOTA MOTOR MANU...|15776.489130434782|
|HYUNDAI-KIA AMERI...|16088.232558139534|
|CHRYSLER DE MEXIC...| 16350.25352112676|
|MAZDA MOTOR CORPO...| 16414.63963963964|
|     HONDA DE MEXICO| 16831.57894736842|
|RENAULT SAMSUNG M...|        16882.9375|
|AUTO ALLIANCE INT...|16884.816666666666|
|HONDA OF CANADA M...|17710.064516129034|
|              BMW AG|17874.666666666668|
|FORD MOTOR COMPAN...|17963.491614758026|
|HONDA OF THE U.K....|           18147.5|
|NISSAN MOTOR COMP...| 18149.16853932584|
|     FCA CANADA INC.|18342.651954602774|
+--------------------+------------

In [12]:
# Percentage of vehicles with vf_DisplacementCC > 2000 per vf_Make
percentage_over_2000cc = df.groupBy("vf_Make") \
                           .agg((sum(expr("CASE WHEN vf_DisplacementCC > 2000 THEN 1 ELSE 0 END")) / count("*") * 100)
                            .alias("percentage_over_2000cc"))

percentage_over_2000cc.show()

+----------+----------------------+
|   vf_Make|percentage_over_2000cc|
+----------+----------------------+
|     ACURA|     72.13375796178345|
|   HYUNDAI|    33.098591549295776|
|      FIAT|                   0.0|
|    TOYOTA|    56.470588235294116|
|    SUBARU|     61.97802197802198|
|    NISSAN|     70.24679170779862|
|  INFINITI|     95.94594594594594|
|      FORD|    57.574915216545286|
|      AUDI|    15.111893386975106|
|ALFA ROMEO|     7.050528789659224|
|     DODGE|      99.6787744007907|
| CHEVROLET|    62.987734087123314|
|   MERCURY|                 100.0|
|     LEXUS|     98.47560975609755|
|      JEEP|     90.96780704410132|
|     VOLVO|     55.55555555555556|
|     MAZDA|     65.88983050847457|
|       BMW|     77.71084337349397|
|VOLKSWAGEN|    53.608247422680414|
|   PONTIAC|     95.91836734693877|
+----------+----------------------+
only showing top 20 rows



In [13]:
# Percentage of vehicles with displacement greater than 3000cc per manufacturer
high_displacement_percentage = df.groupBy("vf_Manufacturer") \
                                 .agg((expr("SUM(CASE WHEN vf_DisplacementCC > 3000 THEN 1 ELSE 0 END)") / count("*") * 100)
                                 .alias("percentage_high_displacement"))

high_displacement_percentage.show()

+--------------------+----------------------------+
|     vf_Manufacturer|percentage_high_displacement|
+--------------------+----------------------------+
|HONDA MFG. INDIAN...|                         0.0|
|   ALFA ROMEO S.P.A.|                         0.0|
|AUTO ALLIANCE INT...|                       100.0|
|FORD MOTOR COMPAN...|           50.15566625155666|
|HONDA OF THE U.K....|                         0.0|
|RENAULT SAMSUNG M...|                         0.0|
|TOYOTA MOTOR NORT...|          43.371943371943374|
|FUJI HEAVY INDUST...|            3.50109409190372|
|     FCA CANADA INC.|                       100.0|
|MAZDA MOTOR MANUF...|                         0.0|
|MAZDA MOTOR CORPO...|                         0.0|
|NEW UNITED MOTOR ...|                         0.0|
|              BMW AG|           8.333333333333332|
|  GENERAL MOTORS LLC|           52.93799956794124|
|CHRYSLER DE MEXIC...|          11.714460036607688|
|HONDA OF AMERICA ...|            42.0097244732577|
|TOYOTA MOTO

In [15]:
# Distribution of displacement across different fuel types
displacement_distribution = df.groupBy("vf_FuelTypePrimary", "vf_DisplacementCC").agg(count("*").alias("count"))

displacement_distribution.show()

+--------------------+-----------------+-----+
|  vf_FuelTypePrimary|vf_DisplacementCC|count|
+--------------------+-----------------+-----+
|            Electric|           3000.0|    2|
|Flexible Fuel Veh...|           3500.0| 1069|
|Flexible Fuel Veh...|           1600.0|  381|
|            Gasoline|           5300.0| 6401|
|            Gasoline|           3900.0|   35|
|Flexible Fuel Veh...|           3900.0|   46|
|            Gasoline|      1343.739248|   20|
|            Gasoline|           1798.0|  486|
|            Gasoline|           6800.0|    4|
|            Gasoline|           5200.0|   42|
|            Gasoline|           3600.0|22137|
|            Gasoline|           6000.0|  372|
|Flexible Fuel Veh...|           3300.0|    1|
|            Gasoline|           1600.0|  811|
|            Gasoline|           4608.0|   32|
|            Gasoline|      1997.583102|    5|
|            Gasoline|       2294.18896|    6|
|            Gasoline|           1300.0|   71|
|            

In [31]:
vehicle_age_distribution = df_with_age.groupBy("vehicle_age") \
    .agg(
        avg("mileage").alias("avg_mileage"),
        count("*").alias("num_vehicles")
    ) \
    .orderBy("vehicle_age")

vehicle_age_distribution.show()

+-----------+------------------+------------+
|vehicle_age|       avg_mileage|num_vehicles|
+-----------+------------------+------------+
|          4|27.917581808097616|        9015|
|          5|  597.303719834674|       22501|
|          6|2639.1694256447777|       21868|
|          7|10944.221755888651|       11675|
|          8|14720.531994873985|       11705|
|          9| 24526.18805281374|        7801|
|         10| 32566.13459972863|        3685|
|         11|         41611.226|        3000|
|         12| 51708.18864628821|        2290|
|         13| 56723.62866927593|        2044|
|         14| 69079.79399727148|        1466|
|         15|  78954.8427480916|         655|
|         16| 82076.60682680152|         791|
|         17| 84777.83738317757|         535|
|         18| 95146.63087248322|         298|
|         19| 83492.34946236559|         186|
|         20| 82264.45033112583|         151|
|         21| 92154.02898550725|         138|
|         22|106294.15454545454|  