# BRONZE

## Check if the API is the link is relevlevent

In [78]:
import requests
from io import BytesIO

try:
    response = requests.get('https://data.wa.gov/api/views/f6w7-q2d2/rows.json?accessType=DOWNLOAD')
    response.raise_for_status()
except requests.exceptions.RequestException:
    raise ValueError("Can't access to the link")


StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 84, Finished, Available, Finished)

## Retrieve the data in a pyspark dataframe

In [79]:
import pandas as pd
df = pd.read_csv('https://data.wa.gov/api/views/f6w7-q2d2/rows.csv?accessType=DOWNLOAD')


print(df.head())


len(df)

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 85, Finished, Available, Finished)

   VIN (1-10)     County       City State  Postal Code  Model Year     Make  \
0  5UXTA6C0XM     Kitsap    Seabeck    WA      98380.0        2021      BMW   
1  5YJ3E1EB1J     Kitsap    Poulsbo    WA      98370.0        2018    TESLA   
2  WP0AD2A73G  Snohomish    Bothell    WA      98012.0        2016  PORSCHE   
3  5YJ3E1EB5J     Kitsap  Bremerton    WA      98310.0        2018    TESLA   
4  1N4AZ1CP3K       King    Redmond    WA      98052.0        2019   NISSAN   

      Model                   Electric Vehicle Type  \
0        X5  Plug-in Hybrid Electric Vehicle (PHEV)   
1   MODEL 3          Battery Electric Vehicle (BEV)   
2  PANAMERA  Plug-in Hybrid Electric Vehicle (PHEV)   
3   MODEL 3          Battery Electric Vehicle (BEV)   
4      LEAF          Battery Electric Vehicle (BEV)   

  Clean Alternative Fuel Vehicle (CAFV) Eligibility  Electric Range  \
0           Clean Alternative Fuel Vehicle Eligible            30.0   
1           Clean Alternative Fuel Vehicle Eligible 

210165

In [80]:
from pyspark.sql.functions import current_timestamp


sparkDF=spark.createDataFrame(df) 
sparkDF = sparkDF.withColumn('date_insert', current_timestamp())
sparkDF.printSchema()
sparkDF.show()

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 86, Finished, Available, Finished)

root
 |-- VIN (1-10): string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: double (nullable = true)
 |-- Model Year: long (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Electric Vehicle Type: string (nullable = true)
 |-- Clean Alternative Fuel Vehicle (CAFV) Eligibility: string (nullable = true)
 |-- Electric Range: double (nullable = true)
 |-- Base MSRP: double (nullable = true)
 |-- Legislative District: double (nullable = true)
 |-- DOL Vehicle ID: long (nullable = true)
 |-- Vehicle Location: string (nullable = true)
 |-- Electric Utility: string (nullable = true)
 |-- 2020 Census Tract: double (nullable = true)
 |-- date_insert: timestamp (nullable = false)

+----------+---------+------------+-----+-----------+----------+---------+----------+---------------------+-------------------------------------------------+--------------+----

In [81]:
sparkDF = sparkDF.withColumnRenamed('VIN (1-10)', 'VIN')
sparkDF = sparkDF.withColumnRenamed('Postal Code', 'postal_code')
sparkDF = sparkDF.withColumnRenamed('Model Year', 'model_year')
sparkDF = sparkDF.withColumnRenamed('Electric Vehicle Type', 'electric_vehicle_type')
sparkDF = sparkDF.withColumnRenamed('Clean Alternative Fuel Vehicle (CAFV) Eligibility', 'Clean_Alternative_Fuel_Vehicle_Eligibility')
sparkDF = sparkDF.withColumnRenamed('Electric Range', 'Electric_Range')
sparkDF = sparkDF.withColumnRenamed('Base MSRP', 'Base_MSRP')
sparkDF = sparkDF.withColumnRenamed('Legislative District', 'Legislative_District')
sparkDF = sparkDF.withColumnRenamed('DOL Vehicle ID', 'DOL_Vehicle_ID')
sparkDF = sparkDF.withColumnRenamed('Vehicle Location', 'Vehicle_Location')
sparkDF = sparkDF.withColumnRenamed('Electric Utility', 'Electric_Utility')
sparkDF = sparkDF.withColumnRenamed('2020 Census Tract', '2020_Census_Tract')


sparkDF.show()

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 87, Finished, Available, Finished)

+----------+---------+------------+-----+-----------+----------+---------+----------+---------------------+------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+--------------------+
|       VIN|   County|        City|State|postal_code|model_year|     Make|     Model|electric_vehicle_type|Clean_Alternative_Fuel_Vehicle_Eligibility|Electric_Range|Base_MSRP|Legislative_District|DOL_Vehicle_ID|    Vehicle_Location|    Electric_Utility|2020_Census_Tract|         date_insert|
+----------+---------+------------+-----+-----------+----------+---------+----------+---------------------+------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+--------------------+
|5UXTA6C0XM|   Kitsap|     Seabeck|   WA|    98380.0|      2021|      BMW|        X5| Plug-in Hybrid El...|              

In [82]:
sparkDF.write.format("delta").mode("append").saveAsTable("electric_vehicle_bronze")

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 88, Finished, Available, Finished)

# Silver

In [83]:
# Définir la requête SQL
sql_query = """
WITH RankedVehicles AS (
    SELECT *,
           row_number() OVER (PARTITION BY DOL_Vehicle_ID ORDER BY date_insert) AS row_num
    FROM electric_vehicle_bronze
)
SELECT *
FROM RankedVehicles
WHERE row_num = (SELECT MAX(row_num) FROM RankedVehicles AS RV WHERE RV.DOL_Vehicle_ID = RankedVehicles.DOL_Vehicle_ID);
"""

# Exécuter la requête SQL et charger le résultat dans un DataFrame
df_ranked_vehicles = spark.sql(sql_query)

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 89, Finished, Available, Finished)

In [84]:
df_ranked_vehicles.write.format("delta").mode("overwrite").saveAsTable("electric_vehicle_silver")

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 90, Finished, Available, Finished)

# GOLD

## Number of vehicle by state by day

In [85]:
%%sql
INSERT OVERWRITE TABLE FCT_VEHICLE_STATE_GOLD
WITH current_data AS (
    SELECT *
    FROM FCT_VEHICLE_STATE_GOLD
    WHERE DATE_PHOTO <> CURRENT_DATE()
),
new_data AS (
    SELECT 
        COUNT(DOL_Vehicle_ID) AS NB_VEHICLE, 
        State, 
        CURRENT_DATE() AS DATE_PHOTO
    FROM 
        electric_vehicle_silver
    GROUP BY 
        State
)

SELECT * FROM current_data
UNION ALL
SELECT * FROM new_data



StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 91, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

## Market Share

In [86]:
%%sql
INSERT OVERWRITE TABLE FCT_MARKET_SHARE_STATE_MAKER_GOLD
WITH current_data AS (
   SELECT *
   FROM FCT_MARKET_SHARE_STATE_MAKER_GOLD
   WHERE DATE_PHOTO <> CURRENT_DATE()
),
new_data AS (
   SELECT 
       COUNT(DOL_Vehicle_ID) AS COUNT_VEHICLE,
       ROUND(COUNT(DOL_Vehicle_ID)/NB_VEHICLE*100, 2) AS MARKET_SHARE,
       MAKE,
       a.State,
       CURRENT_DATE() AS DATE_PHOTO
   FROM 
       electric_vehicle_silver a
   JOIN FCT_VEHICLE_STATE_GOLD b ON a.State = b.State
   GROUP BY MAKE, a.State, b.NB_VEHICLE
)
SELECT * FROM current_data
UNION ALL
SELECT * FROM new_data

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 92, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

## 

## FCT_KPI_MAKER_GOLD

In [87]:
%%sql
INSERT OVERWRITE TABLE FCT_KPI_MAKER_GOLD
WITH current_data AS (
   SELECT *
   FROM FCT_KPI_MAKER_GOLD
   WHERE DATE_PHOTO <> CURRENT_DATE()
),

new_data AS (
SELECT 
   MAKE,
   COUNT(DISTINCT MODEL) AS NB_MODEL,
   MIN(model_year) AS OLDEST_MODEL_YEAR,
   MAX(model_year) AS NEWEST_MODEL_YEAR, 
   ROUND(AVG(Electric_Range), 0) AS AVG_ELECTRIC_RANGE,
   RANK() OVER (ORDER BY COUNT(DOL_Vehicle_ID) DESC) AS MAKE_RANK,
   CURRENT_DATE() AS DATE_PHOTO
FROM 
   electric_vehicle_silver
GROUP BY 
   MAKE
ORDER BY 
   MAKE_RANK
)
SELECT * FROM current_data
UNION ALL
SELECT * FROM new_data

StatementMeta(, df960e9f-fa22-41be-8a2f-5eb705f22955, 93, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>