## ***ETL automatizado en AWS***

Los datasets cargados en un bucket de **S3** fueron sometidos a un proceso de *extracción, transformación y carga* (`ETL` por sus siglas en inglés) usando `jobs` de **AWS Glue** escritos en `PySpark`. El proceso fue programado para realizarse el dia 15 de cada mes a las 00:00hs.

#### ***Script de `sound_quality`***

In [None]:
import sys
import boto3
import pandas as pd
from io import BytesIO
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Obtener argumentos del trabajo de Glue
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'BUCKET_NAME', 'INPUT_FILE_KEY', 'OUTPUT_FILE_KEY'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

s3_client = boto3.client('s3')
BUCKET_NAME = args['BUCKET_NAME']
INPUT_FILE_KEY = args['INPUT_FILE_KEY']
OUTPUT_FILE_KEY = args['OUTPUT_FILE_KEY']

def main():
    # Leer el archivo CSV directamente con Spark
    input_path = f's3://{BUCKET_NAME}/{INPUT_FILE_KEY}'
    annotations = spark.read.csv(input_path, header=True, inferSchema=True)
    
    # Seleccionar las columnas necesarias
    df_sound = annotations.select('borough', 'latitude', 'longitude', 'year', 'day', 'hour', '1_engine_presence')

    # Renombrar la columna '1_engine_presence'
    df_sound = df_sound.withColumnRenamed('1_engine_presence', 'Engine Sound')
    
    # Mapeo de valores
    df_sound = df_sound.withColumn(
        'Engine Sound', 
        when(col('Engine Sound') == -1, 'Low')
        .when(col('Engine Sound') == 0, 'Medium')
        .when(col('Engine Sound') == 1, 'High')
    )

    # Renombrar otras columnas
    df_sound = df_sound.withColumnRenamed('borough', 'Borough') \
                       .withColumnRenamed('latitude', 'Latitude') \
                       .withColumnRenamed('longitude', 'Longitude') \
                       .withColumnRenamed('year', 'Year') \
                       .withColumnRenamed('day', 'Day') \
                       .withColumnRenamed('hour', 'Hour')

    # Escribir el DataFrame en formato Parquet a S3
    output_path = f's3://{BUCKET_NAME}/{OUTPUT_FILE_KEY}'
    df_sound.write.mode('overwrite').parquet(output_path)

if __name__ == "__main__":
    main()

#### ***Script de `electric_alternative_fuel_stations`***

In [None]:
import sys
import boto3
import pandas as pd
from io import BytesIO
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
# Obtener argumentos del trabajo de Glue
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'BUCKET_NAME', 'INPUT_FILE_KEY', 'OUTPUT_FILE_KEY'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

s3_client = boto3.client('s3')
BUCKET_NAME = args['BUCKET_NAME']
INPUT_FILE_KEY = args['INPUT_FILE_KEY']
OUTPUT_FILE_KEY = args['OUTPUT_FILE_KEY']

def main():
    # Leer el archivo CSV directamente con Spark
    input_path = f's3://{BUCKET_NAME}/{INPUT_FILE_KEY}'
    df_stations = spark.read.csv(input_path, header=True, inferSchema=True)
    
    # Seleccionar las columnas necesarias
    columnas = ['Fuel Type Code', 'Station Name', 'Street Address', 'City', 'State', 'Latitude', 'Longitude']
    fuel_stations = df_stations.select(*columnas)

    # Filtrar por el estado de Nueva York
    fuel_stations = fuel_stations.filter(fuel_stations['State'] == 'NY')

    # Filtrar por los 5 distritos principales
    boroughs = ['Brooklyn', 'Manhattan', 'Queens', 'Bronx', 'Staten Island']
    fuel_stations = fuel_stations.filter(fuel_stations['City'].isin(boroughs))

    # Renombrar columna 'City' a 'Borough'
    fuel_stations = fuel_stations.withColumnRenamed('City', 'Borough')

    # Eliminar la columna 'State'
    fuel_stations = fuel_stations.drop('State')

    # Escribir el DataFrame en formato Parquet a S3
    output_path = f's3://{BUCKET_NAME}/{OUTPUT_FILE_KEY}'
    fuel_stations.write.mode('overwrite').parquet(output_path)

if __name__ == "__main__":
    main()

#### ***Script de `taxi_zones`***

In [None]:
import sys
import boto3
import pandas as pd
from io import BytesIO
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

s3_client = boto3.client('s3')
BUCKET_NAME = args['BUCKET_NAME']
INPUT_FILE_KEY = args['INPUT_FILE_KEY']
OUTPUT_FILE_KEY = args['OUTPUT_FILE_KEY']

def main():
    # Descargar el archivo DBF desde S3
    obj = s3_client.get_object(Bucket=BUCKET_NAME, Key=INPUT_FILE_KEY)
    dbf_content = obj['Body'].read()

    # Guardar el contenido del archivo DBF temporalmente
    temp_dbf_path = "/tmp/taxi_zones.dbf"
    with open(temp_dbf_path, "wb") as temp_dbf:
        temp_dbf.write(dbf_content)

    # Leer el archivo DBF con pandas
    from dbfread import DBF
    taxi_zones = DBF(temp_dbf_path)
    taxi_zones_df = pd.DataFrame(taxi_zones)

    # Convertir el DataFrame de pandas a Spark DataFrame
    taxi_zones_spark_df = spark.createDataFrame(taxi_zones_df)

    # Eliminar columnas innecesarias
    innecesarias = ['Shape_Area', 'Shape_Leng']
    taxi_zones_spark_df = taxi_zones_spark_df.drop(*innecesarias)

    # Renombrar columnas
    taxi_zones_spark_df = taxi_zones_spark_df.withColumnRenamed('zone', 'Zone') \
                                             .withColumnRenamed('OBJECTID', 'ObjectID') \
                                             .withColumnRenamed('borough', 'Borough')

    # Eliminar la columna 'ObjectID'
    taxi_zones_spark_df = taxi_zones_spark_df.drop('ObjectID')

    # Eliminar duplicados
    taxi_zones_spark_df = taxi_zones_spark_df.dropDuplicates()

    # Escribir el DataFrame en formato Parquet a S3
    output_path = f's3://{BUCKET_NAME}/{OUTPUT_FILE_KEY}'
    taxi_zones_spark_df.write.mode('overwrite').parquet(output_path)

if __name__ == "__main__":
    main()

#### ***Script de `vehicle_fuel_economy_data`***

In [None]:
import sys
import boto3
import pandas as pd
from io import BytesIO
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# Obtener argumentos del trabajo de Glue
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'BUCKET_NAME', 'INPUT_FILE_KEY', 'OUTPUT_FILE_KEY'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

s3_client = boto3.client('s3')
BUCKET_NAME = args['BUCKET_NAME']
INPUT_FILE_KEY = args['INPUT_FILE_KEY']
OUTPUT_FILE_KEY = args['OUTPUT_FILE_KEY']

def main():
    # Leer el archivo CSV directamente con Spark
    input_path = f's3://{BUCKET_NAME}/{INPUT_FILE_KEY}'
    vfed = spark.read.csv(input_path, header=True, inferSchema=True)

    # Seleccionar las columnas necesarias
    vfed = vfed.select('Model', 'Year', 'Manufacturer', 'VClass', 'fuelType1', 'fuelType2', 'city08', 'co2')

    # Renombrar columnas
    vfed = vfed.withColumnRenamed('VClass', 'Category') \
               .withColumnRenamed('fuelType1', 'Fuel') \
               .withColumnRenamed('fuelType2', 'Alternative Fuel') \
               .withColumnRenamed('city08', 'Consumption (mpg)') \
               .withColumnRenamed('co2', 'CO2 (g/mile)')

    # Reemplazar valores nulos en la columna 'Fuel' con 'No'
    vfed = vfed.fillna({'Fuel': 'No'})

    # Reemplazar valores en la columna 'Fuel'
    vfed = vfed.replace(['Gasoline', 'Electricity'], ['Gasoline', 'Electric'], subset='Fuel')

    # Eliminar filas duplicadas basadas en todas las columnas
    vfed = vfed.dropDuplicates()

    # Filtrar las categorías necesarias
    categories = ['Small Sport Utility Vehicle 4WD', 'Small Sport Utility Vehicle 2WD', 
                  'Compact Cars', 'Midsize Cars', 'Large Cars', 'Standard Sport Utility Vehicle 4WD',
                  'Standard Sport Utility Vehicle 2WD', 'Minivan - 2WD', 'Vans', 'Minivan - 4WD']
    vfed = vfed.filter(vfed['Category'].isin(categories))

    # Escribir el DataFrame en formato Parquet a S3
    output_path = f's3://{BUCKET_NAME}/{OUTPUT_FILE_KEY}'
    vfed.write.mode('overwrite').parquet(output_path)

if __name__ == "__main__":
    main()