In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, year

In [None]:
# Initialize Spark Session
spark = SparkSession.builder.appName("EV Sales Data Analysis").getOrCreate()

In [None]:
# Load the dataset
df = spark.read.csv("/IEA-EV-dataEV salesHistoricalCars.csv", header=True, inferSchema=True)

In [None]:
# Preview the data
df.printSchema()
df.show(5)

root
 |-- region: string (nullable = true)
 |-- category: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- powertrain: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)

+---------+----------+--------------+----+----------+----+--------+--------------------+
|   region|  category|     parameter|mode|powertrain|year|    unit|               value|
+---------+----------+--------------+----+----------+----+--------+--------------------+
|Australia|Historical|      EV sales|Cars|       BEV|2011|Vehicles|                49.0|
|Australia|Historical|EV stock share|Cars|        EV|2011| percent|3.900000010617077...|
|Australia|Historical|EV sales share|Cars|        EV|2011| percent|0.006500000134110451|
|Australia|Historical|      EV stock|Cars|       BEV|2011|Vehicles|                49.0|
|Australia|Historical|      EV stock|Cars|       BEV|2012|Vehicl

In [None]:
# Data Cleaning
df_clean = df.dropna(how="any")

In [None]:
# Aggregated Analysis
# Total EV sales by year
sales_by_year = df_clean.groupBy("year").agg(sum("value").alias("total_sales")).orderBy("year")
sales_by_year.show()

+----+--------------------+
|year|         total_sales|
+----+--------------------+
|2010|    70978.7207436126|
|2011|   285588.0108696912|
|2012|   742720.0002148302|
|2013|  1473643.7232171951|
|2014|  2561216.4220652403|
|2015|   4552899.912754189|
|2016|   6828047.930784922|
|2017|1.0525976713900255E7|
|2018| 1.701155216890844E7|
|2019|2.2472929724019326E7|
|2020| 3.397518928814869E7|
|2021|  5.93561953893497E7|
|2022| 9.100238453730103E7|
|2023|1.3256485441860037E8|
+----+--------------------+



In [None]:
# Total EV sales by region
sales_by_region = df_clean.groupBy("region").agg(sum("value").alias("total_sales")).orderBy(col("total_sales").desc())
sales_by_region.show()

+--------------+--------------------+
|        region|         total_sales|
+--------------+--------------------+
|         World|1.5422581302707928E8|
|         China| 7.801320872585829E7|
|        Europe| 4.444434366701014E7|
|          EU27|3.1541538708500028E7|
|           USA| 2.156330253489906E7|
|       Germany|   9807279.524880093|
|        France|   6136208.925230009|
|United Kingdom|   5973932.845299745|
|        Norway|          4667650.57|
|         Japan|  3456082.8860999206|
|   Netherlands|   3261989.192600503|
|        Sweden|  2417205.3854916226|
|        Canada|   2373967.780899913|
|         Korea|  2078441.4851003333|
|         Italy|  1756414.0726003079|
|       Belgium|   1653062.761799809|
|         Spain|  1299119.3412399162|
|   Switzerland|  1123248.7960000988|
|       Denmark|  1063940.0706995751|
|       Austria|   872067.7148998864|
+--------------+--------------------+
only showing top 20 rows



In [None]:
#  Filter Data - Only Battery Electric Vehicles (BEV)
bev_df = df_clean.filter(df_clean["powertrain"] == "BEV")
bev_by_year = bev_df.groupBy("year").agg(sum("value").alias("BEV_sales")).orderBy("year")
bev_by_year.show()

+----+-----------+
|year|  BEV_sales|
+----+-----------+
|2010|    67792.0|
|2011|   243067.0|
|2012|   441667.0|
|2013|   844696.0|
|2014|  1510387.0|
|2015|  2592271.0|
|2016|  3932673.0|
|2017|  6241910.0|
|2018|1.0591713E7|
|2019|1.4797219E7|
|2020| 2.163866E7|
|2021|3.8522375E7|
|2022|6.1180847E7|
|2023| 8.959061E7|
+----+-----------+



In [None]:
# Save the insights to CSV (Optional)
sales_by_year.coalesce(1).write.csv("/mnt/data/output_sales_by_year", header=True, mode="overwrite")
sales_by_region.coalesce(1).write.csv("/mnt/data/output_sales_by_region", header=True, mode="overwrite")

In [None]:
# Stop Spark session
spark.stop()