In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, sum, col, lower,desc, count
import os

In [2]:
aws_key = os.getenv('AWS_API_ID')
aws_secret = os.environ.get('AWS_API_SECRET')

In [3]:
spark = SparkSession \
        .builder \
        .appName("Wrangling Data") \
        .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

ev_bucket = "udac-evs"
ev_path = "Light_Duty_Vehicles.csv"
ev_population_path = "Electric_Vehicle_Population_Data.csv"
us_population_path = "us_population.csv"
us_state_code_path = "state_code.csv"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/03 15:43:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# EV Vehicles

In [4]:
ev_df = spark.read.option("header",True).csv(f"s3a://{ev_bucket}/{ev_path}")
ev_df.show(truncate=False)

23/01/03 15:43:35 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

23/01/03 15:43:45 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 1:>                                                          (0 + 1) / 1]

+----------------------+-------+----------------------------+---------------+-----------+---------------------------+----------+-----------------------------+--------------------------------+---------------------------------+------------------------------+---------------------------------+----------------------------------+-----------------+-----------+--------------------------------+---------------------+----------------------------------+------------+----------------------------+-----------+---------+---------------+-----------------------+-------------------+----------------+---------+-----+----------+
|Vehicle ID            |Fuel ID|Fuel Configuration ID       |Manufacturer ID|Category ID|Model                      |Model Year|Alternative Fuel Economy City|Alternative Fuel Economy Highway|Alternative Fuel Economy Combined|Conventional Fuel Economy City|Conventional Fuel Economy Highway|Conventional Fuel Economy Combined|Transmission Type|Engine Type|Engine Size                    

                                                                                

In [5]:
ev_df = ev_df.where(col('Model').isNotNull())  \
    .where(col('Model Year').isNotNull())  \
    .where(col('Manufacturer').isNotNull())  \
    .select(
    lower(col('Model')).alias('model'), 
    col('Model Year').alias('year'), 
    col('Transmission Type').alias('transmission_type'), 
    col('Engine Type').alias('engine_type'), 
    col('Engine Size').alias('engine_size'), 
    lower(col('Manufacturer')).alias('make'), 
    col('Category').alias('category'), 
    col('Fuel').alias('fuel')) \
    .dropDuplicates()
ev_df.count()


                                                                                

2477

# EV Population Vehicles

In [6]:
ev_pop_df = spark.read.option("header",True).csv(f"s3a://{ev_bucket}/{ev_population_path}")
ev_pop_df.printSchema()
ev_pop_df.count()

root
 |-- VIN (1-10): string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIP Code: string (nullable = true)
 |-- Model Year: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Electric Vehicle Type: string (nullable = true)
 |-- Clean Alternative Fuel Vehicle (CAFV) Eligibility: string (nullable = true)
 |-- Electric Range: string (nullable = true)
 |-- Base MSRP: string (nullable = true)
 |-- Legislative District: string (nullable = true)
 |-- DOL Vehicle ID: string (nullable = true)
 |-- Vehicle Location: string (nullable = true)



                                                                                

1002000

In [7]:
ev_pop_df = ev_pop_df.where(col('State').isNotNull())  \
    .where(col('Model Year').isNotNull())  \
    .where(col('Make').isNotNull())  \
    .where(col('Model').isNotNull())  \
    .select(col('VIN (1-10)').alias('ID'),
            col('City').alias('city'), 
            col('State').alias('state'), 
            col('Model Year').alias('model_year'), 
            lower(col('Make')).alias('manufacturer'), 
            lower(col('Model')).alias('model_name'), 
            col('Electric Vehicle Type').alias('electric_vehicle_type'))  \
    .dropDuplicates()
ev_pop_df.show(truncate=False)




+----------+-------------+-----+----------+------------+----------------------+--------------------------------------+
|ID        |city         |state|model_year|manufacturer|model_name            |electric_vehicle_type                 |
+----------+-------------+-----+----------+------------+----------------------+--------------------------------------+
|WBY8P4C55K|SEATTLE      |WA   |2019      |bmw         |i3 with range extender|Plug-in Hybrid Electric Vehicle (PHEV)|
|KMHE24L14G|ARLINGTON    |WA   |2016      |hyundai     |sonata plug-in hybrid |Plug-in Hybrid Electric Vehicle (PHEV)|
|1FADP5CU5F|BELLINGHAM   |WA   |2015      |ford        |c-max energi          |Plug-in Hybrid Electric Vehicle (PHEV)|
|5YJ3E1EB2J|SPOKANE      |WA   |2018      |tesla       |model 3               |Battery Electric Vehicle (BEV)        |
|5YJ3E1EA9J|BOTHELL      |WA   |2018      |tesla       |model 3               |Battery Electric Vehicle (BEV)        |
|1G1RD6E43D|RICHLAND     |WA   |2013      |chevr

                                                                                

# US Population

In [8]:
us_pop_df = spark.read.option("header",True).csv(f"s3a://{ev_bucket}/{us_population_path}")
us_pop_df = us_pop_df.withColumn("population", col("POPESTIMATE2019").cast('int')).drop('POPESTIMATE2019')

total_pop = us_pop_df.agg(sum("population")).collect()[0][0]

                                                                                

In [9]:
def population_ratio(state_population):
    return round((state_population / total_pop) * 100, 1)
 
population_ratio_udf = udf(population_ratio)

In [10]:
us_state_df = spark.read.option("header",True).csv(f"s3a://{ev_bucket}/{us_state_code_path}")
us_state_pop_ratio_df = us_pop_df.join(us_state_df, us_pop_df.STATE == us_state_df.state, "left")  \
                            .select(us_state_df.state, us_state_df.code, col('population'))  \
                            .withColumn('ratio', population_ratio_udf(col('population')))  \
                            .drop('STATE')  \
                            .dropDuplicates()
us_state_pop_ratio_df.count()

                                                                                

51

In [11]:
us_state_pop_ratio_df.show(30, truncate=False)

[Stage 27:>                                                         (0 + 1) / 1]                                                                                

+----+----------+-----+
|code|population|ratio|
+----+----------+-----+
|MN  |5639632   |1.7  |
|NJ  |8882190   |2.7  |
|HI  |1415872   |0.4  |
|PA  |12801989  |3.9  |
|MA  |6892503   |2.1  |
|OK  |3956971   |1.2  |
|MD  |6045680   |1.8  |
|SC  |5148714   |1.6  |
|IL  |12671821  |3.9  |
|VT  |623989    |0.2  |
|OR  |4217737   |1.3  |
|IA  |3155070   |1.0  |
|ND  |762062    |0.2  |
|SD  |884659    |0.3  |
|FL  |21477737  |6.5  |
|NV  |3080156   |0.9  |
|OH  |11689100  |3.6  |
|KS  |2913314   |0.9  |
|WY  |578759    |0.2  |
|WV  |1792147   |0.5  |
|UT  |3205958   |1.0  |
|MS  |2976149   |0.9  |
|AR  |3017804   |0.9  |
|NE  |1934408   |0.6  |
|WI  |5822434   |1.8  |
|CT  |3565287   |1.1  |
|NY  |19453561  |5.9  |
|LA  |4648794   |1.4  |
|CA  |39512223  |12.0 |
|MT  |1068778   |0.3  |
+----+----------+-----+
only showing top 30 rows



# Consolidating Output

In [12]:
ev_demographics_df = ev_pop_df.join(us_state_pop_ratio_df, us_state_pop_ratio_df.code == ev_pop_df.state, "left")  \
                                .join(ev_df, (ev_df.model == ev_pop_df.model_name) & (ev_df.year == ev_pop_df.model_year) & (ev_df.make == ev_pop_df.manufacturer), "left")  \
                                .drop('model', 'year', 'make', 'code').dropDuplicates()


### Finding EV distribution by state and county

In [21]:
ev_state_dist = ev_demographics_df.groupBy('state', 'ratio') \
    .agg(count("*").alias("count")).orderBy(desc('count'))
ev_state_dist.show(truncate=False)

                                                                                

+-----+-----+-----+
|state|ratio|count|
+-----+-----+-----+
|WA   |2.3  |37892|
|VA   |2.6  |9    |
|CA   |12.0 |8    |
|MD   |1.8  |4    |
|GA   |3.2  |3    |
|TX   |8.8  |3    |
|KS   |0.9  |2    |
|CO   |1.8  |2    |
|NJ   |2.7  |2    |
|IL   |3.9  |1    |
|AL   |1.5  |1    |
|CT   |1.1  |1    |
|AZ   |2.2  |1    |
|OR   |1.3  |1    |
|NC   |3.2  |1    |
|NV   |0.9  |1    |
+-----+-----+-----+



In [22]:
ev_city_dist = ev_demographics_df.groupBy('city') \
    .agg(count("*").alias("count")).orderBy(desc('count'))
ev_city_dist.show(truncate=False)



+-----------------+-----+
|city             |count|
+-----------------+-----+
|SEATTLE          |2638 |
|BELLEVUE         |1282 |
|VANCOUVER        |1125 |
|REDMOND          |1091 |
|KIRKLAND         |1002 |
|OLYMPIA          |901  |
|TACOMA           |893  |
|BOTHELL          |853  |
|SAMMAMISH        |841  |
|RENTON           |802  |
|BELLINGHAM       |709  |
|MERCER ISLAND    |666  |
|WOODINVILLE      |641  |
|ISSAQUAH         |629  |
|SPOKANE          |617  |
|EVERETT          |599  |
|KENT             |554  |
|BAINBRIDGE ISLAND|527  |
|SHORELINE        |503  |
|SNOHOMISH        |502  |
+-----------------+-----+
only showing top 20 rows



                                                                                

# Quality Checks

In [15]:
ev_demographics_load_df = spark.read.parquet(f's3a://{ev_bucket}/output')

                                                                                

### Core Columns should exist in the output

In [16]:
core_columns = set(['state', 'model_name', 'model_year', 'manufacturer', 'population', 'ratio'])
if not core_columns.issubset(set(ev_demographics_load_df.columns)):
    raise Exception('One Or More Core Columns are missing!')

### Essential Columns should not be nullable

In [17]:
nullable_count = ev_demographics_load_df.filter((col('state').isNull()) | 
                                           (col('model_name').isNull()) | 
                                           (col('model_year').isNull()) | 
                                           (col('manufacturer').isNull()))

if nullable_count.count() != 0:
    raise Exception('One Or More Null Values!')

                                                                                

# Push to S3

In [19]:
ev_demographics_df.printSchema()
ev_demographics_df.write.format("parquet").partitionBy('state').mode("overwrite").save(f's3a://{ev_bucket}/output')


root
 |-- ID: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- model_year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- electric_vehicle_type: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- ratio: string (nullable = true)
 |-- transmission_type: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_size: string (nullable = true)
 |-- category: string (nullable = true)
 |-- fuel: string (nullable = true)



[Stage 80:>                                                         (0 + 8) / 8]

23/01/03 16:04:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [23]:
ev_state_dist.write.format("csv").mode("overwrite").save(f's3a://{ev_bucket}/dist/ev_state_dist.csv')
ev_city_dist.write.format("csv").mode("overwrite").save(f's3a://{ev_bucket}/dist/ev_city_dist.csv')


23/01/03 16:13:03 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


                                                                                

23/01/03 16:15:04 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


                                                                                

23/01/03 16:16:17 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


                                                                                

23/01/03 16:18:09 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


                                                                                