## *AWS Amazon Web Services*

*En esta primera etapa, hemos decidio utilizar una función Lambda en Amazon Web Services (AWS) encargada de realizar web scraping en la página de la Comisión de Taxis y Limusinas para extraer datos sobre los viajes realizados en la ciudad de Nueva York durante el año 2023. Posteriormente, estos datos son almacenados en un bucket de AWS S3*

In [None]:
import os
import requests
import boto3
from bs4 import BeautifulSoup
import re

# Definimos la función de manejo de la solicitud
def lambda_handler(event, context):
    # Configuramos las credenciales de AWS
    AWS_ACCESS_KEY_ID = 'AWS_ACCESS_KEY_ID'
    AWS_SECRET_ACCESS_KEY = 'AWS_SECRET_ACCESS_KEY'
    AWS_REGION = 'AWS_REGION'

    # Creamos un cliente de S3
    s3_client = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=AWS_REGION)

    # Crear carpeta de destino si no existe
    carpeta_destino = "/tmp/0-DataSets/1-BBDDscrapeados"
    if not os.path.exists(carpeta_destino):
        os.makedirs(carpeta_destino)

    # Obtener la lista de archivos en la carpeta de destino
    archivos_locales = os.listdir(carpeta_destino)

    # URL de la página web
    url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

    # Realizar la solicitud HTTP a la página
    response = requests.get(url)

    # Verificar si la solicitud fue exitosa
    if response.status_code == 200:
        # Parsear el contenido HTML de la página
        soup = BeautifulSoup(response.text, 'html.parser')

        # Encontrar todos los enlaces que contienen 'yellow' o 'green' y tienen el formato esperado para 2023
        links_pagina = {a['href'] for a in soup.find_all('a', href=True) if re.match(r'^.*(yellow|green)tripdata(2023|2024)-\d{2}\.parquet$', a['href'])}

        # Combinar enlaces generados dinámicamente con los obtenidos de la página
        links_totales = links_pagina.copy()  # Crear una copia para mantener los enlaces originales

        meses = [str(num).zfill(2) for num in range(1, 13)]  # Lista de meses como cadenas de dos dígitos

        for tipo in ['yellow', 'green']:
            links_totales.update({
                f"https://d37ci6vzurychx.cloudfront.net/trip-data/{tipo}_tripdata_{año}-{mes}.parquet" 
                for año in ['2023', '2024']  # Iterar sobre los años
                for mes in meses
            })

        # Realizar la descarga de los archivos
        for link in sorted(links_totales):  # Iterar sobre los enlaces ordenados alfabéticamente
            nombre_archivo = link.split("/")[-1]  # Obtener el nombre del archivo desde la URL
            archivo_destino = os.path.join(carpeta_destino, nombre_archivo)  # Construir la ruta completa del archivo destino

            if nombre_archivo not in archivos_locales:  # Verificar si el archivo no existe localmente
                response = requests.get(link)
                if response.status_code == 200:
                    with open(archivo_destino, 'wb') as file:
                        file.write(response.content)
                    print(f"Archivo descargado: {nombre_archivo}")

                    # Subir el archivo a S3
                    s3_client.upload_file(archivo_destino, 'proceso-webscraping', nombre_archivo)
                    print(f"Archivo subido a S3: {nombre_archivo}")
                else:
                    print(f"No se pudo descargar el archivo desde {link}. Código de estado: {response.status_code}")
            else:
                print(f"El archivo {nombre_archivo} ya existe localmente.")  # Imprimir un mensaje indicando que el archivo ya existe
    else:
        print(f"No se pudo acceder a la página. Código de estado: {response.status_code}")  # Imprimir un mensaje de error si la solicitud no fue exitosa


#### *Una vez que hemos adquirido los archivos en formato Parquet, recurrimos al empleo de AWS Glue para automatizar el proceso de Extracción, Transformación y Carga (ETL) de los datos utilizando pySpark.*

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, to_date, to_timestamp, hour, round
from pyspark.sql.types import StringType, IntegerType
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
import os
# Inicializar el contexto de Glue
glueContext = GlueContext(SparkContext.getOrCreate())
# Crear un objeto Job
job = Job(glueContext)
job.init("my_job_name")  # Reemplaza "my_job_name" con el nombre deseado para tu job
spark = SparkSession.builder.appName("Parquet").getOrCreate()
df_yellow_container = []
# Iteramos sobre la dirección del contenedor para extraer todos los archivos del 2023
for month in range(1, 13):
    
    # Creamos la variable que contenga el path a cada parquet de yellow taxis
    yellow_path = "s3://webscraping-proceso/yellow_tripdata_2023-{0}.parquet".format(str(month).zfill(2))
    
    # Leemos el parquet
    df_yellow = spark.read.parquet(yellow_path)
    
    # Lo agregamos a la lista df_yellow_container
    df_yellow_container.append(df_yellow)

# Iteramos sobre la dirección del contenedor para extraer todos los archivos del 2024
for month in range(1, 2):
    
    # Creamos la variable que contenga el path a cada parquet de yellow taxis
    yellow_path = "s3://webscraping-proceso/yellow_tripdata_2024-{0}.parquet".format(str(month).zfill(2))
    
    # Leemos el parquet
    df_yellow = spark.read.parquet(yellow_path)
    
    # Lo agregamos a la lista df_yellow_container
    df_yellow_container.append(df_yellow)

# Unir todos los DataFrames en uno solo
yellow_concatenado = None
for df in df_yellow_container:
    if yellow_concatenado is None:
        yellow_concatenado = df
    else:
        yellow_concatenado = yellow_concatenado.union(df)
# Agregar una nueva columna 'total_pay' al DataFrame yellow_concatenado que contiene la suma de 'tip_amount' y 'total_amount', limitando 'amount' a 2 decimales
yellow_concatenado = yellow_concatenado.withColumn('amount', round(col('tip_amount') + col('total_amount'), 2))
# Crear un nuevo DataFrame 'yellowdf' con las columnas seleccionadas
yellowdf = yellow_concatenado.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'RatecodeID', 'passenger_count', 'trip_distance', 'amount')
# Renombrar las columnas en un nuevo DataFrame
yellowdf = yellowdf \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
# Eliminamos solo el 7% de las filas donde contiene valores NaN
yellowdf = yellowdf.na.drop()
# Eliminar filas con valores de pasajeros diferentes de 99 y 0
yellowdf = yellowdf.filter((col("passenger_count") != 99) & (col("passenger_count") != 0))

# Agregar una columna 'service_type' con el valor 'yellow'
yellowdf = yellowdf.withColumn('service_type', lit('yellow'))

# Calcular el número total de filas
filas_totales_yellow = yellowdf.count()

# Calcular el número de filas a tomar para el muestreo
porcentaje = 0.20
filas_a_tomar_yellow = int(filas_totales_yellow * porcentaje)
# Tomar una muestra aleatoria de las filas seleccionadas
filas_yellow_seleccionadas = yellowdf.sample(False, porcentaje)
df_green_container = []
# Iteramos sobre la dirección del contenedor para extraer todos los archivos del 2023
for month in range(1, 13):
    
    # Creamos la variable que contenga el path a cada parquet de green taxis
    green_path = "s3://webscraping-proceso/green_tripdata_2023-{0}.parquet".format(str(month).zfill(2))
    
    # Leemos el parquet
    df_green = spark.read.parquet(green_path)
    
    # Lo agregamos a la lista df_green_container
    df_green_container.append(df_green)
    
# Iteramos sobre la dirección del contenedor para extraer todos los archivos del 2024
for month in range(1, 2):
    
    # Creamos la variable que contenga el path a cada parquet de green taxis
    green_path = "s3://webscraping-proceso/green_tripdata_2024-{0}.parquet".format(str(month).zfill(2))
    
    # Leemos el parquet
    df_green = spark.read.parquet(green_path)
    
    # Lo agregamos a la lista df_green_container
    df_green_container.append(df_green)
# Unir todos los DataFrames en uno solo
green_concatenado = None
for df in df_green_container:
    if green_concatenado is None:
        green_concatenado = df
    else:
        green_concatenado = green_concatenado.union(df)
# Agregar una nueva columna 'total_pay' al DataFrame green_concatenado que contiene la suma de 'tip_amount' y 'total_amount', limitando 'amount' a 2 decimales
green_concatenado = green_concatenado.withColumn('amount', round(col('tip_amount') + col('total_amount'), 2))
# Crear un nuevo DataFrame 'greendf' con las columnas seleccionadas
greendf = green_concatenado.select('lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'RatecodeID', 'passenger_count', 'trip_distance', 'amount')
# Renombrar las columnas en un nuevo DataFrame
greendf_renombrado = greendf \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
# Eliminar filas con valores NaN en el DataFrame greendf
greendf = greendf.na.drop()
# Eliminar filas con valores de pasajeros diferentes de 99 y 0, y trip_distance diferente de 0
greendf = greendf.filter((col("passenger_count") != 99) & (col("passenger_count") != 0) & (col("trip_distance") != 0))
# Añadir la columna 'service_type' con el valor 'green'
greendf = greendf.withColumn('service_type', lit('green'))
# Calcular el número de filas total
filas_totales = greendf.count()
# Calcular el número de filas a tomar para el muestreo
porcentaje = 0.20
filas_a_tomar = int(filas_totales * porcentaje)
# Tomar una muestra aleatoria de las filas seleccionadas
filas_green_seleccionadas = greendf.sample(False, porcentaje)
# Concatenar los DataFrames de yellow y green taxis
taxis = filas_yellow_seleccionadas.union(filas_green_seleccionadas)
# Definir el diccionario zones
zones = {
    'Manhattan': [4, 12, 13, 24, 41, 42, 43, 45, 48, 50, 68, 74, 75, 79, 87, 88, 90, 100, 103, 103, 103, 107, 113, 114, 116, 120, 125,
                  127, 128, 137, 140, 141, 142, 143, 144, 148, 151, 152, 153, 158, 161, 162, 163, 164, 166, 170, 186, 194, 202, 209, 211,
                  224, 229, 230, 231, 232, 233, 234, 236, 237, 238, 239, 243, 244, 246, 249, 261, 262, 263],
    'Brooklyn': [11, 14, 17, 21, 22, 25, 26, 29, 33, 34, 35, 36, 37, 39, 40, 49, 52, 54, 55, 61, 62, 63, 65, 66, 67, 71, 72, 76, 77, 80,
                 85, 89, 91, 97, 106, 108, 111, 112, 123, 133, 149, 150, 154, 155, 165, 177, 178, 181, 188, 189, 190, 195, 210, 217, 222,
                 225, 227, 228, 255, 256, 257],
    'Queens': [2, 7, 8, 9, 10, 15, 16, 19, 27, 28, 30, 38, 53, 56, 56, 64, 70, 73, 82, 83, 86, 92, 93, 95, 96, 98, 101, 102, 117, 121,
               122, 124, 129, 130, 131, 132, 134, 135, 138, 139, 145, 146, 157, 160, 171, 173, 175, 179, 180, 191, 192, 193, 196, 197,
               198, 201, 203, 205, 207, 215, 216, 218, 219, 223, 226, 252, 253, 258, 260],
    'Bronx': [3, 18, 20, 31, 32, 46, 47, 51, 58, 59, 60, 69, 78, 81, 94, 119, 126, 136, 147, 159, 167, 168, 169, 174, 182, 183, 184,
              185, 199, 200, 208, 212, 213, 220, 235, 240, 241, 242, 247, 248, 250, 254, 259],
    'Staten Island': [5, 6, 23, 44, 84, 99, 109, 110, 115, 118, 156, 172, 176, 187, 204, 206, 214, 221, 245, 251],
    'EWR': [1]
}
# Crear columnas 'pickup_borough' y 'dropoff_borough' utilizando el diccionario zones
taxis = taxis.withColumn('pickup_borough', 
                         when(col('PULocationID').isin(zones['Manhattan']), 'Manhattan')
                         .when(col('PULocationID').isin(zones['Brooklyn']), 'Brooklyn')
                         .when(col('PULocationID').isin(zones['Queens']), 'Queens')
                         .when(col('PULocationID').isin(zones['Bronx']), 'Bronx')
                         .when(col('PULocationID').isin(zones['Staten Island']), 'Staten Island')
                         .when(col('PULocationID').isin(zones['EWR']), 'EWR')
                         .otherwise('Unknown'))

taxis = taxis.withColumn('dropoff_borough', 
                         when(col('DOLocationID').isin(zones['Manhattan']), 'Manhattan')
                         .when(col('DOLocationID').isin(zones['Brooklyn']), 'Brooklyn')
                         .when(col('DOLocationID').isin(zones['Queens']), 'Queens')
                         .when(col('DOLocationID').isin(zones['Bronx']), 'Bronx')
                         .when(col('DOLocationID').isin(zones['Staten Island']), 'Staten Island')
                         .when(col('DOLocationID').isin(zones['EWR']), 'EWR')
                         .otherwise('Unknown'))
# Filtrar las filas donde 'pickup_borough' y 'dropoff_borough' no sean 'Unknown'
taxis = taxis.filter(~col('pickup_borough').isin(['Unknown']) & ~col('dropoff_borough').isin(['Unknown']))

# Cambiar el tipo de dato de 'passenger_count' a entero
taxis = taxis.withColumn('passenger_count', col('passenger_count').cast(IntegerType()))
# Seleccionar columnas específicas
taxisdf = taxis.select('pickup_datetime', 'dropoff_datetime', 'RatecodeID', 'passenger_count', 
                       'trip_distance', 'amount', 'service_type', 'pickup_borough', 'dropoff_borough')
# Separar pickup_datetime en fecha y hora
taxisdf = taxisdf.withColumn('pickup_date', to_date(col('pickup_datetime')))
taxisdf = taxisdf.withColumn('pickup_hour', hour(col('pickup_datetime')).cast('string'))

# Separar dropoff_datetime en fecha y hora
taxisdf = taxisdf.withColumn('dropoff_date', to_date(col('dropoff_datetime')))
taxisdf = taxisdf.withColumn('dropoff_hour', hour(col('dropoff_datetime')).cast('string'))

# Eliminar columnas originales pickup_datetime y dropoff_datetime
taxisdf = taxisdf.drop('pickup_datetime', 'dropoff_datetime')
# Especificar la ruta de salida en S3 donde se guardará el archivo Parquet
ruta_salida = "s3://taxisclean/taxis_NYC_2023.parquet"
# Especificamos que queremos un solo archivo parquet y lo guardamos en el bucket
taxisdf.coalesce(1).write.parquet(ruta_salida, mode='overwrite')
# Detener sesion de Spark
spark.stop()
# Finalizar el job
job.commit()

*Tras finalizar la procesamiento de los datos, se procede a guardar el archivo resultante en formato Parquet en un bucket de AWS S3. No obstante, se detecta que el archivo se almacena con un nombre predeterminado, generado automáticamente durante el proceso de guardado utilizando Apache Spark con compresión Snappy. Para solventar este inconveniente, se implementa una función Lambda adicional que se encarga de renombrar el archivo antes de almacenarlo en un bucket destinado a alimentar nuestro Data Warehouse.*

In [None]:
import boto3

def lambda_handler(event, context):
    # Nombre del bucket y ruta del archivo original
    source_bucket_name = 'tu_bucket_de_entrada'
    destination_bucket_name = 'tu_bucket_de_salida'
    source_folder_name = 'tu_carpeta/'
    
    # Creamos una instancia del cliente de S3 con tus credenciales de acceso
    s3_client = boto3.client(
        's3',
        region_name='tu_region',
        aws_access_key_id='tu_access_key',
        aws_secret_access_key='tu_secret_key'
    )
    
    # Obtenemos la lista de objetos en la carpeta especificada
    response = s3_client.list_objects_v2(
        Bucket=source_bucket_name,
        Prefix=source_folder_name
    )
    
    # Renombramos y copiamos cada archivo que termine en ".parquet" al nuevo bucket
    for obj in response.get('Contents', []):
        file_key = obj['Key']
        if file_key.endswith('.parquet'):
            new_file_key = 'nuevo_nombre.parquet'
            s3_client.copy_object(
                Bucket=destination_bucket_name,
                CopySource={'Bucket': source_bucket_name, 'Key': file_key},
                Key=new_file_key
            )
            s3_client.delete_object(
                Bucket=source_bucket_name,
                Key=file_key
            )
    
    # Eliminamos todos los objetos dentro de la carpeta original
    for obj in response.get('Contents', []):
        s3_client.delete_object(
            Bucket=source_bucket_name,
            Key=obj['Key']
        )

    # Eliminamos la carpeta original después de copiar y eliminamos todos los archivos
    s3_client.delete_object(
        Bucket=source_bucket_name,
        Key=source_folder_name
    )
    
    return {
        'statusCode': 200,
        'body': 'Archivos renombrados, copiados al nuevo bucket y carpeta eliminada exitosamente'
    }


*Una vez que el archivo está en el bucket que abastece a AWS Athena, podemos realizar consultas SQL para obtener información de los datos almacenados en el Data Warehouse.*
*A medida que la base de datos se renueva, nuestro Data Warehouse se actualiza automáticamente mediante una función Lambda. Esta función se encarga de tomar los nuevos registros de viajes y almacenarlos en el Data Warehouse.*