<a href="https://colab.research.google.com/github/AndresLwis/AndresLwis/blob/main/Valor_Usando_PySpark_para_BigData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ENTREGA FINAL BIG DATA
#Leonardo Villegas - Lewis Puello

En el presente informe se hará entrega del taller final de big data, en el que se buscó generar valor agregado al dataset “Electric Vehicle Population”, mediante operaciones con PySpark. También se utilizó el dataset: “Electric_Vehicle_Title_and_Registration_Activity” para realizar relaciones entre tablas y proponer diferentes tipos de resultado.
A continuación, se detalla el paso a paso de la elaboración del taller.

##Fuentes de Información

Se utilizó el dataset: “Electric Vehicle Population” del catálogo de datos de Washington, el cual muestra los vehículos eléctricos de batería (BEV) y los vehículos eléctricos híbridos enchufables (PHEV) que están actualmente registrados a través del Departamento de Licencias del Estado de Washington (DOL).

Teniendo los datos se procedió a descargarlos (26.116 KB como se muestra en la  y montarlos en Google Drive en una nueva carpeta destinada para los Datasets). Esa carpeta se leyó desde Google Colaboratory activando la sincronización con el Google Drive para leer la ruta del archivo y tenerlo a disposición en el momento de requerir analizarlo el usuario.

Las variables del dataset son las siguientes
* VIN (1-10): The 1st 10 characters of each vehicle's Vehicle Identification Number (VIN).
* County: The county in which the registered owner resides.
* City: The city in which the registered owner resides.
* State: The state in which the registered owner resides.
* Postal Code: The 5-digit zip code in which the registered owner resides.
* Model Year: The model year of the vehicle, determined by decoding the Vehicle Identification Number (VIN).
* Make: The manufacturer of the vehicle, determined by decoding the Vehicle Identification Number (VIN).
*	Model: The model of the vehicle, determined by decoding the Vehicle Identification Number (VIN).
*	Electric Vehicle Type: This distinguishes the vehicle as all electric or a plug-in hybrid.
*	Clean Alternative: This categorizes vehicle as Clean Alternative Fuel Vehicles (CAFVs) based on the fuel requirement and electric-only range
*	Electric Range: Describes how far a vehicle can travel purely on its electric charge.
*	Base MSRP: This is the lowest Manufacturer's Suggested Retail Price (MSRP) for any trim level of the model in question.
*	Legislative District: The specific section of Washington State that the vehicle's owner resides in, as represented in the state legislature.
*	DOL Vehicle ID: Unique number assigned to each vehicle by Department of Licensing for identification purposes.
*	Vehicle Localization: The center of the ZIP Code for the registered vehicle.
*	Electric Utility: This is the electric power retail service territories serving the address of the registered vehicle. All ownership types for areas.
*	2020 Census Tract: The census tract identifier is a combination of the state, county, and census tract codes as assigned by the United States


Fuente: Kaggle
Disponible en: https://www.kaggle.com/datasets/ssarkar445/electric-vehicle-population?resource=download


## Instalar PySpark

In [None]:
# Install pyspark
!pip install pyspark
 
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark
 
# Import a Spark function from library
from pyspark.sql.functions import col

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Cargar datos

In [None]:
from pyspark.sql import SparkSession, DataFrame
import sys
 
 
spark = SparkSession.builder \
    .master('local') \
    .appName('test etl') \
    .config('spark.executor.memory', '1gb') \
    .config("spark.cores.max", "2") \
    .getOrCreate()
 
separator = ","
df = spark.read.option("delimiter", separator).csv('/content/drive/MyDrive/FinalBigData_(Datasets)/Electric_Vehicle_Population_Data.csv', header=True, inferSchema=True)
 
df.printSchema()
 
df.show(n=30, truncate=False)
 
#display(df)
 
print("Cant=" + str(df.count()))
 
# Se observará que sale un DF de una sola columna y es porque faltó especificar el separador.
# separator = "\t"
# .option("delimiter", separator)

root
 |-- VIN (1-10): string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Model Year: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Electric Vehicle Type: string (nullable = true)
 |-- Clean Alternative Fuel Vehicle (CAFV) Eligibility: string (nullable = true)
 |-- Electric Range: integer (nullable = true)
 |-- Base MSRP: integer (nullable = true)
 |-- Legislative District: integer (nullable = true)
 |-- DOL Vehicle ID: integer (nullable = true)
 |-- Vehicle Location: string (nullable = true)
 |-- Electric Utility: string (nullable = true)
 |-- 2020 Census Tract: long (nullable = true)

+----------+---------+-----------------+-----+-----------+----------+---------+----------+--------------------------------------+------------------------------------------------------------+--------------+---------+

#Operación de Filtrado

Se propone realizar un análisis a los modelos a partir del año 2014 (Año en que la venta de estos vehículos en el mercado empieza a crecer constantemente) y solo a aquellos registros que tengan un rango de viaje de acuerdo con la carga eléctrica.

In [None]:
#Se filtran los datos de los modelos posteriores al 2014 y aquellos a los que se les ha investigado el recorrido máximo de recorrido
df = df.where((df["Model Year"] >= "2014") & (df["Electric Range"]>'0'))
df.show(n=30, truncate=False)

+----------+---------+-----------------+-----+-----------+----------+---------+----------+--------------------------------------+-------------------------------------------------+--------------+---------+--------------------+--------------+---------------------------+---------------------------------------------+-----------------+
|VIN (1-10)|County   |City             |State|Postal Code|Model Year|Make     |Model     |Electric Vehicle Type                 |Clean Alternative Fuel Vehicle (CAFV) Eligibility|Electric Range|Base MSRP|Legislative District|DOL Vehicle ID|Vehicle Location           |Electric Utility                             |2020 Census Tract|
+----------+---------+-----------------+-----+-----------+----------+---------+----------+--------------------------------------+-------------------------------------------------+--------------+---------+--------------------+--------------+---------------------------+---------------------------------------------+-----------------+
|

## Agregando Valor a los Datos

Con el ánimo de conocer la distribución de los registros por marca y modelo agrupamos por estas variables en orden ascendente el recuento de registros, arrojando los siguientes resultados 

In [None]:
dfagr = df.groupBy("Make","Model").count() 
dfagr = dfagr.orderBy(dfagr["count"].desc())
dfagr.show(n=30, truncate=False)

+----------+-----------+-----+
|Make      |Model      |count|
+----------+-----------+-----+
|TESLA     |MODEL 3    |13814|
|NISSAN    |LEAF       |7844 |
|TESLA     |MODEL S    |5240 |
|CHEVROLET |BOLT EV    |3590 |
|CHEVROLET |VOLT       |3511 |
|TESLA     |MODEL X    |3263 |
|TOYOTA    |PRIUS PRIME|2380 |
|TESLA     |MODEL Y    |2293 |
|BMW       |I3         |1865 |
|CHRYSLER  |PACIFICA   |1794 |
|FORD      |FUSION     |1658 |
|KIA       |NIRO       |1595 |
|BMW       |X5         |1407 |
|FORD      |C-MAX      |1167 |
|JEEP      |WRANGLER   |1104 |
|TOYOTA    |RAV4 PRIME |1038 |
|VOLKSWAGEN|E-GOLF     |1029 |
|VOLVO     |XC90       |820  |
|HONDA     |CLARITY    |783  |
|FIAT      |500        |716  |
|VOLVO     |XC60       |702  |
|AUDI      |A3         |575  |
|AUDI      |E-TRON     |524  |
|MITSUBISHI|OUTLANDER  |523  |
|AUDI      |Q5 E       |479  |
|KIA       |SOUL       |429  |
|KIA       |SORENTO    |345  |
|BMW       |530E       |323  |
|BMW       |330E       |303  |
|BMW    

Se utilizaron ventanas para mirar el ranking de los modelos por cada marca. Es decir, cuantos vehículos hay registrados por cada marca de manera organizada, esto nos permite conocer el top de modelos por cada marca.

In [None]:
#Realizamos una operación mediante ventanas para mirar el ranking de los modelos para cada marca 
from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType, DateType, TimestampType, \
    LongType, FloatType
from pyspark.sql.window import Window
from pyspark.sql.functions import min, max, rank,percent_rank

w = Window.partitionBy(dfagr["Make"]).orderBy(dfagr["count"].desc())
df_ranked = dfagr.select("Make","Model","count",rank().over(w).alias("Ranking"))
dfagr.printSchema()
df_ranked.show(n=30, truncate=False)

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- count: long (nullable = false)

+---------+----------------+-----+-------+
|Make     |Model           |count|Ranking|
+---------+----------------+-----+-------+
|AUDI     |A3              |575  |1      |
|AUDI     |E-TRON          |524  |2      |
|AUDI     |Q5 E            |479  |3      |
|AUDI     |Q5              |138  |4      |
|AUDI     |E-TRON SPORTBACK|72   |5      |
|AUDI     |A7              |11   |6      |
|AUDI     |A8 E            |3    |7      |
|BENTLEY  |BENTAYGA        |2    |1      |
|BENTLEY  |FLYING SPUR     |1    |2      |
|BMW      |I3              |1865 |1      |
|BMW      |X5              |1407 |2      |
|BMW      |530E            |323  |3      |
|BMW      |330E            |303  |4      |
|BMW      |X3              |292  |5      |
|BMW      |I8              |102  |6      |
|BMW      |740E            |30   |7      |
|BMW      |745E            |7    |8      |
|BMW      |745LE         

Se realizó una agrupación por marca y modelo para conocer el rango de recorrido de los vehículos, obteniendo los siguientes resultados.

In [None]:
#Agrupamos para conocer el rango de recorrido por cada marca y modelo
from pyspark.sql.functions import count, avg, max, min
dfagr2 = df.groupBy("Make","Model").agg(max("Electric Range"),min("Electric Range")) 
dfagr2 = dfagr2.orderBy(dfagr2["max(Electric Range)"].desc())
dfagr2.show(n=30, truncate=False)

+-------------+---------------------+-------------------+-------------------+
|Make         |Model                |max(Electric Range)|min(Electric Range)|
+-------------+---------------------+-------------------+-------------------+
|TESLA        |MODEL S              |337                |208                |
|TESLA        |MODEL 3              |322                |215                |
|TESLA        |MODEL X              |293                |200                |
|TESLA        |MODEL Y              |291                |291                |
|CHEVROLET    |BOLT EV              |259                |238                |
|HYUNDAI      |KONA                 |258                |258                |
|KIA          |NIRO                 |239                |26                 |
|JAGUAR       |I-PACE               |234                |234                |
|POLESTAR     |PS2                  |233                |233                |
|AUDI         |E-TRON               |222                |204    

Se agrega otro dataset para realizar una operación JOIN

In [None]:
from pyspark.sql import SparkSession, DataFrame
import sys
 
 
spark = SparkSession.builder \
    .master('local') \
    .appName('test etl') \
    .config('spark.executor.memory', '1gb') \
    .config("spark.cores.max", "2") \
    .getOrCreate()
 
separator = ","
df2 = spark.read.option("delimiter", separator).csv('/content/drive/MyDrive/FinalBigData_(Datasets)/Electric_Vehicle_Title_and_Registration_Activity.csv', header=True, inferSchema=True)
 
df2.printSchema()
 
df2.show(n=30, truncate=False)
 
#display(df)
 
print("Cant=" + str(df.count()))
 
# Se observará que sale un DF de una sola columna y es porque faltó especificar el separador.
# separator = "\t"
# .option("delimiter", separator)

root
 |-- Clean Alternative Fuel Vehicle Type: string (nullable = true)
 |-- VIN (1-10): string (nullable = true)
 |-- Model Year: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- New or Used Vehicle: string (nullable = true)
 |-- Sale Price: integer (nullable = true)
 |-- DOL Transaction Date: string (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Transaction Year: integer (nullable = true)
 |-- Electric Vehicle Fee Paid: string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Electric Range: integer (nullable = true)
 |-- Base MSRP: integer (nullable = true)
 |-- 2015 HB 2778 Exemption Eligibility: string (nullable = true)
 |-- Sale Date: string (nullable = true)
 |-- Vehicle Primary Use: string (nullable = true)
 |-- State of Residence: string (nullable = true)
 |-- DOL Vehicle ID: integer (nullable = true)
 |-- Leg

Finalmente se realizó un LEFT JOIN para buscar en el dataset: “Electric Vehicle Title and Registration Activity”, el precio de venta de los vehículos. Por tanto en este dataset nos aseguramos que tuviera un precio de venta mayor a cero mediante un filtro. Para este caso también renombramos el nombre de la columna con la que hicimos el LEFT JOIN. El resultado se muestra a continuación:

In [None]:
df = df.withColumnRenamed("DOL Vehicle ID", "DOLVehicleID")
df2 = df2.withColumnRenamed("DOL Vehicle ID", "DOLVehicleID")
df2 = df2.where(df2["Sale Price"] > '0')
join_fields = ['DOLVehicleID']
df_result = df2.alias("T").join(df.alias("R"), 
                                       on=join_fields, 
                                       how="left_outer").select(["R.DOLVehicleID","R.Make","R.Model","R.Model Year"] + [col("T.Sale Price")])
df_result.show(n=30, truncate=False)     

+------------+----------+----------+----------+----------+
|DOLVehicleID|Make      |Model     |Model Year|Sale Price|
+------------+----------+----------+----------+----------+
|null        |null      |null      |null      |70440     |
|477229294   |CHEVROLET |BOLT EV   |2017      |21219     |
|null        |null      |null      |null      |64440     |
|null        |null      |null      |null      |71940     |
|null        |null      |null      |null      |53290     |
|null        |null      |null      |null      |39559     |
|190012441   |TOYOTA    |RAV4 PRIME|2021      |42414     |
|null        |null      |null      |null      |53290     |
|157695324   |BMW       |X5        |2018      |43000     |
|181358292   |TOYOTA    |RAV4 PRIME|2021      |43097     |
|null        |null      |null      |null      |47995     |
|140137504   |KIA       |NIRO      |2020      |31304     |
|null        |null      |null      |null      |137440    |
|null        |null      |null      |null      |65190    

Para realizar el guardado se utilizó el formato Parquet y el modo “Overwrite”. Los objetos se guardaron en una carpeta creada en el Google Drive: “FinalBigDataGuardado”.

In [None]:
df.repartition(50).write.format("parquet").partitionBy("Model Year", "Electric Vehicle Type").mode("overwrite").save("/content/drive/MyDrive/FinalBigDataGuardado")