# **Implementación de servicios de Amazon Web Services**

### En esta primer estapa, creamos una función `Lambda` que se encarga de realizar web scrapping a la página [Taxi & Limousine Comission](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) para extraer los datos de los viajes realizados en la ciudad de New York durante el año 2023 y finalmente almacenarlos en un bucket de [AWS S3](https://aws.amazon.com/es/s3/).

In [None]:
import os
import re
import requests
import boto3
from botocore.exceptions import NoCredentialsError
from bs4 import BeautifulSoup

# Definimos la función de manejo de la solicitud
def lambda_handler(event, context):
    # Configuramos las credenciales de AWS
    AWS_ACCESS_KEY_ID = 'tu_access_key'
    AWS_SECRET_ACCESS_KEY = 'tu_secret_key'
    AWS_REGION = 'tu_region'

    # Creamos un cliente de S3
    s3_client = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=AWS_REGION)

    # Guardamos la URL en una variable
    url = 'https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page'

    # Realizamos la solicitud HTTP
    response = requests.get(url)

    # Verificamos si la solicitud fue exitosa
    if response.status_code == 200:
        # Analizamos el contenido HTML con BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')

        # Buscamos el elemento <div> con la clase "faq-answers" y el id "faq2023"
        target_div = soup.find('div', {'class': 'faq-answers', 'id': 'faq2023'})

        if target_div:
            # Buscamos los enlaces <a> dentro de target_div que tienen title="Yellow Taxi Trip Records" o "Green Taxi Trip Records"
            taxi_links = target_div.find_all('a', title=['Yellow Taxi Trip Records', 'Green Taxi Trip Records'])

            if taxi_links:
                print("Enlaces con el título 'Yellow Taxi Trip Records' o 'Green Taxi Trip Records':")
                links = []
                for link in taxi_links:
                    links.append(link.get('href'))
                    print(link.get('href'))  # Imprime el enlace URL
            else:
                print("No se encontraron enlaces con los títulos 'Yellow Taxi Trip Records' o 'Green Taxi Trip Records' dentro de 'faq-answers'.")
        else:
            print("No se encontró el elemento con la clase 'faq-answers' y el id 'faq2023'.")
    else:
        print("La solicitud no fue exitosa. Código de estado:", response.status_code)

    # Generamos dinámicamente los enlaces para los meses del 01 al 10
    meses = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']

    # Guardamos la URL en una variable
    url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

    # Realizamos la solicitud HTTP a la página
    response = requests.get(url)

    # Verificamos si la solicitud fue exitosa (código de estado 200)
    if response.status_code == 200:
        # Parseamos el contenido HTML de la página
        soup = BeautifulSoup(response.text, 'html.parser')

        # Encontramos todos los enlaces que contienen 'yellow' o 'green' y tienen el formato esperado para 2023
        links_pagina = {a['href'] for a in soup.find_all('a', href=True) if re.match(r'^.*(yellow|green)_tripdata_2023-\d{2}\.parquet$', a['href'])}

        # Combinamos enlaces generados dinámicamente con los obtenidos de la página
        links_totales = links_pagina.copy()  # Creamos una copia para mantener los enlaces originales
        for mes in meses:
            enlace_yellow = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-{mes}.parquet"
            enlace_green = f"https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-{mes}.parquet"
            links_totales.add(enlace_yellow)
            links_totales.add(enlace_green)

        # Realizamos la descarga de los archivos y los subimos a S3
        for link in sorted(links_totales):
            nombre_archivo = link.split("/")[-1]  # Obtenemos el nombre del archivo desde la URL

            try:
                # Verificamos si el archivo ya existe en el bucket
                s3_client.head_object(Bucket='tu_bucket', Key=nombre_archivo)
                print(f"El archivo {nombre_archivo} ya existe en S3.")
            except:
                response = requests.get(link)

                if response.status_code == 200:
                    # Subimos el archivo directamente a S3
                    try:
                        s3_client.put_object(Body=response.content, Bucket='tu_bucket', Key=nombre_archivo)
                        print(f"Archivo subido a S3: {nombre_archivo}")
                    except NoCredentialsError:
                        print('Las credenciales de AWS no están disponibles.')
                else:
                    print(f"No se pudo descargar el archivo: {nombre_archivo}. Código de estado: {response.status_code}")
    else:
        print(f"No se pudo acceder a la página. Código de estado: {response.status_code}")

if __name__ == "__main__":
    lambda_handler(None, None)  # Ejecutamos la función lambda_handler localmente para pruebas

### Una vez obtenidos los archivos parquet, recurrimos al uso de [AWS Glue](https://aws.amazon.com/es/glue/) para automatizar el proceso de normalización de los datos.

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, udf
from pyspark.sql.functions import expr, when


# Creamos un SparkSession
spark = SparkSession.builder.appName("Concatenate Parquet Files").getOrCreate()


# TAXIS VERDES


# Creamos la lista para almacenar los DataFrames (green)
df_green_list = []

# Iteramos sobre los meses
for month in range(1, 11):

    # Obtenemos la ruta al archivo Parquet (green)
    parquet_file_green_path = "s3://tu_bucket_de_entrada/green_tripdata_2023-{0}.parquet".format(str(month).zfill(2))

    # Leemos el archivo Parquet (green)
    df_green = spark.read.parquet(parquet_file_green_path)

    # Agregamos el DataFrame a la lista (green)
    df_green_list.append(df_green)

# Concatenamos los DataFrames de la lista (green)
concatenated_df_green = df_green_list[0]  # Iniciamos la concatenación con el primer DataFrame
for i in range(1, len(df_green_list)):
    concatenated_df_green = concatenated_df_green.union(df_green_list[i])  # Usamos el método union de DataFrames

# Eliminamos columnas
columns_to_drop_green = ['store_and_fwd_flag', 'fare_amount', 'extra', 'mta_tax', 'tolls_amount', 'improvement_surcharge',
                         'ehail_fee', 'congestion_surcharge', 'trip_type', 'VendorID', 'RatecodeID', 'payment_type']
concatenated_df_green = concatenated_df_green.drop(*columns_to_drop_green)

# Renombramos columnas
column_rename_green = {
    'lpep_pickup_datetime': 'pickup_datetime',
    'lpep_dropoff_datetime': 'dropoff_datetime',
    'total_amount': 'amount'
}
for old_col, new_col in column_rename_green.items():
    concatenated_df_green = concatenated_df_green.withColumnRenamed(old_col, new_col)
    
# Sumamos las columnas amount y tip_amount para crear la columna total_amount
concatenated_df_green = concatenated_df_green.withColumn('total_amount', concatenated_df_green['amount'] + concatenated_df_green['tip_amount'])

# Eliminamos las columnas amount y tip_amount al no ser necesarias
columns_to_drop_green = ['amount', 'tip_amount']
concatenated_df_green = concatenated_df_green.drop(*columns_to_drop_green)

# Agregamos la columna 'service_type'
concatenated_df_green = concatenated_df_green.withColumn('service_type', F.lit('green'))

# Reducimos a una sola partición antes de escribir (green)
concatenated_df_green = concatenated_df_green.coalesce(1)


# TAXIS AMARILLOS


# Creamos la lista para almacenar los DataFrames (yellow)
df_yellow_list = []

# Iteramos sobre los meses
for month in range(1, 11):

    # Obtenemos la ruta al archivo Parquet (yellow)
    parquet_file_yellow_path = "s3://tu_bucket_de_entrada/yellow_tripdata_2023-{0}.parquet".format(str(month).zfill(2))

    # Leemos el archivo Parquet (yellow)
    df_yellow = spark.read.parquet(parquet_file_yellow_path)

    # Agregamos el DataFrame a la lista (yellow)
    df_yellow_list.append(df_yellow)

# Concatenamos los DataFrames de la lista (yellow)
concatenated_df_yellow = df_yellow_list[0]  # Iniciamos la concatenación con el primer DataFrame
for i in range(1, len(df_yellow_list)):
    concatenated_df_yellow = concatenated_df_yellow.union(df_yellow_list[i])  # Usamos el método union de DataFrames

# Eliminamos columnas
columns_to_drop_yellow = ['store_and_fwd_flag', 'fare_amount', 'extra', 'mta_tax', 'tolls_amount', 'improvement_surcharge',
                           'congestion_surcharge', 'airport_fee', 'Airport_fee', 'VendorID', 'RatecodeID', 'payment_type']
concatenated_df_yellow = concatenated_df_yellow.drop(*columns_to_drop_yellow)

# Renombramos columnas
column_rename_yellow = {
    'tpep_pickup_datetime': 'pickup_datetime',
    'tpep_dropoff_datetime': 'dropoff_datetime',
    'total_amount': 'amount'
}
for old_col, new_col in column_rename_yellow.items():
    concatenated_df_yellow = concatenated_df_yellow.withColumnRenamed(old_col, new_col)
    
# Sumamos las columnas amount y tip_amount para crear la columna total_amount
concatenated_df_yellow = concatenated_df_yellow.withColumn('total_amount', concatenated_df_yellow['amount'] + concatenated_df_yellow['tip_amount'])

# Eliminamos las columnas amount y tip_amount al no ser necesarias
columns_to_drop_yellow = ['amount', 'tip_amount']
concatenated_df_yellow = concatenated_df_yellow.drop(*columns_to_drop_yellow)

# Agregamos la columna 'service_type'
concatenated_df_yellow = concatenated_df_yellow.withColumn('service_type', F.lit('yellow'))

# Reducimos a una sola partición antes de escribir (yellow)
concatenated_df_yellow = concatenated_df_yellow.coalesce(1)

# Reorganizamos columnas para taxis amarillos
column_order_yellow = concatenated_df_green.columns
concatenated_df_yellow = concatenated_df_yellow.select(*column_order_yellow)


# CONCATENAMOS TAXIS VERDES Y AMARILLOS


# Fusionamos los DataFrames green y yellow
concatenated_taxis = concatenated_df_green.union(concatenated_df_yellow)

# Reducimos a una sola partición antes de escribir (opcional)
concatenated_taxis = concatenated_taxis.coalesce(1)

# Cambiamos el tipo de dato de 'passenger_count' a IntegerType
concatenated_taxis = concatenated_taxis.withColumn('passenger_count', concatenated_taxis['passenger_count'].cast('int'))

# Reemplazamos los valores nulos de 'Passenger_count' por 1
concatenated_taxis = concatenated_taxis.na.fill({'passenger_count': 1})

# Reemplazamos los valores 0.00 en 'trip_distance' por 1.00
concatenated_taxis = concatenated_taxis.withColumn('trip_distance', F.when(concatenated_taxis['trip_distance'] == 0.00, 1.00).otherwise(concatenated_taxis['trip_distance']))

# Creamos columnas 'pickup_borough' y 'dropoff_borough' usando expresiones condicionales
pickup_borough_expr = expr("""
    CASE
        WHEN PULocationID IN (4, 12, 13, 24, 41, 42, 43, 45, 48, 50, 68, 74, 75, 79, 87, 88, 90, 100, 103, 103, 103, 107, 113, 114, 116, 120, 125,
                             127, 128, 137, 140, 141, 142, 143, 144, 148, 151, 152, 153, 158, 161, 162, 163, 164, 166, 170, 186, 194, 202, 209, 211,
                             224, 229, 230, 231, 232, 233, 234, 236, 237, 238, 239, 243, 244, 246, 249, 261, 262, 263) THEN 'Manhattan'
        WHEN PULocationID IN (11, 14, 17, 21, 22, 25, 26, 29, 33, 34, 35, 36, 37, 39, 40, 49, 52, 54, 55, 61, 62, 63, 65, 66, 67, 71, 72, 76, 77, 80,
                             85, 89, 91, 97, 106, 108, 111, 112, 123, 133, 149, 150, 154, 155, 165, 177, 178, 181, 188, 189, 190, 195, 210, 217, 222,
                             225, 227, 228, 255, 256, 257) THEN 'Brooklyn'
        WHEN PULocationID IN (2, 7, 8, 9, 10, 15, 16, 19, 27, 28, 30, 38, 53, 56, 56, 64, 70, 73, 82, 83, 86, 92, 93, 95, 96, 98, 101, 102, 117, 121,
                              122, 124, 129, 130, 131, 132, 134, 135, 138, 139, 145, 146, 157, 160, 171, 173, 175, 179, 180, 191, 192, 193, 196, 197,
                              198, 201, 203, 205, 207, 215, 216, 218, 219, 223, 226, 252, 253, 258, 260) THEN 'Queens'
        WHEN PULocationID IN (3, 18, 20, 31, 32, 46, 47, 51, 58, 59, 60, 69, 78, 81, 94, 119, 126, 136, 147, 159, 167, 168, 169, 174, 182, 183, 184,
                              185, 199, 200, 208, 212, 213, 220, 235, 240, 241, 242, 247, 248, 250, 254, 259) THEN 'Bronx'
        WHEN PULocationID IN (5, 6, 23, 44, 84, 99, 109, 110, 115, 118, 156, 172, 176, 187, 204, 206, 214, 221, 245, 251) THEN 'Staten Island'
        WHEN PULocationID IN (1) THEN 'EWR'
        ELSE 'Unknown'
    END AS pickup_borough
""")

dropoff_borough_expr = expr("""
    CASE
        WHEN DOLocationID IN (4, 12, 13, 24, 41, 42, 43, 45, 48, 50, 68, 74, 75, 79, 87, 88, 90, 100, 103, 103, 103, 107, 113, 114, 116, 120, 125,
                             127, 128, 137, 140, 141, 142, 143, 144, 148, 151, 152, 153, 158, 161, 162, 163, 164, 166, 170, 186, 194, 202, 209, 211,
                             224, 229, 230, 231, 232, 233, 234, 236, 237, 238, 239, 243, 244, 246, 249, 261, 262, 263) THEN 'Manhattan'
        WHEN DOLocationID IN (11, 14, 17, 21, 22, 25, 26, 29, 33, 34, 35, 36, 37, 39, 40, 49, 52, 54, 55, 61, 62, 63, 65, 66, 67, 71, 72, 76, 77, 80,
                             85, 89, 91, 97, 106, 108, 111, 112, 123, 133, 149, 150, 154, 155, 165, 177, 178, 181, 188, 189, 190, 195, 210, 217, 222,
                             225, 227, 228, 255, 256, 257) THEN 'Brooklyn'
        WHEN DOLocationID IN (2, 7, 8, 9, 10, 15, 16, 19, 27, 28, 30, 38, 53, 56, 56, 64, 70, 73, 82, 83, 86, 92, 93, 95, 96, 98, 101, 102, 117, 121,
                              122, 124, 129, 130, 131, 132, 134, 135, 138, 139, 145, 146, 157, 160, 171, 173, 175, 179, 180, 191, 192, 193, 196, 197,
                              198, 201, 203, 205, 207, 215, 216, 218, 219, 223, 226, 252, 253, 258, 260) THEN 'Queens'
        WHEN DOLocationID IN (3, 18, 20, 31, 32, 46, 47, 51, 58, 59, 60, 69, 78, 81, 94, 119, 126, 136, 147, 159, 167, 168, 169, 174, 182, 183, 184,
                              185, 199, 200, 208, 212, 213, 220, 235, 240, 241, 242, 247, 248, 250, 254, 259) THEN 'Bronx'
        WHEN DOLocationID IN (5, 6, 23, 44, 84, 99, 109, 110, 115, 118, 156, 172, 176, 187, 204, 206, 214, 221, 245, 251) THEN 'Staten Island'
        WHEN DOLocationID IN (1) THEN 'EWR'
        ELSE 'Unknown'
    END AS dropoff_borough
""")

concatenated_taxis = concatenated_taxis.withColumn('pickup_borough', pickup_borough_expr)
concatenated_taxis = concatenated_taxis.withColumn('dropoff_borough', dropoff_borough_expr)

# Eliminamos las filas donde 'pickup_borough' o 'dropoff_borough' sean 'Unknown'
concatenated_taxis = concatenated_taxis.filter(concatenated_taxis.pickup_borough != 'Unknown')

# Guardamos el DataFrame concatenado
concatenated_taxis.write.parquet("s3://tu_bucket_de_salida/tu_carpeta")

# Detenemos la sesión de Spark
spark.stop()

### Completada la normalización de los datos, se guarda el archivo resultante en formato Parquet en un bucket de [AWS S3](https://aws.amazon.com/es/s3/). Sin embargo, el archivo se guarda con el nombre predeterminado `part-00000-8bbda55c-fad7-46b5-9f8d-b8d8e336c149-c000.snappy.parquet`. Este nombre se genera automáticamente cuando se guarda un archivo Parquet utilizando Apache Spark con el formato de compresión Snappy.
### Para solucionar este problema, utilizamos otra función `Lambda` para renombrar el archivo y almacenarlo en un bucket que alimenta a nuestro `Data Warehouse`.

In [None]:
import boto3

def lambda_handler(event, context):
    # Nombre del bucket y ruta del archivo original
    source_bucket_name = 'tu_bucket_de_entrada'
    destination_bucket_name = 'tu_bucket_de_salida'
    source_folder_name = 'tu_carpeta/'
    
    # Creamos una instancia del cliente de S3 con tus credenciales de acceso
    s3_client = boto3.client(
        's3',
        region_name='tu_region',
        aws_access_key_id='tu_access_key',
        aws_secret_access_key='tu_secret_key'
    )
    
    # Obtenemos la lista de objetos en la carpeta especificada
    response = s3_client.list_objects_v2(
        Bucket=source_bucket_name,
        Prefix=source_folder_name
    )
    
    # Renombramos y copiamos cada archivo que termine en ".parquet" al nuevo bucket
    for obj in response.get('Contents', []):
        file_key = obj['Key']
        if file_key.endswith('.parquet'):
            new_file_key = 'nuevo_nombre.parquet'
            s3_client.copy_object(
                Bucket=destination_bucket_name,
                CopySource={'Bucket': source_bucket_name, 'Key': file_key},
                Key=new_file_key
            )
            s3_client.delete_object(
                Bucket=source_bucket_name,
                Key=file_key
            )
    
    # Eliminamos todos los objetos dentro de la carpeta original
    for obj in response.get('Contents', []):
        s3_client.delete_object(
            Bucket=source_bucket_name,
            Key=obj['Key']
        )

    # Eliminamos la carpeta original después de copiar y eliminamos todos los archivos
    s3_client.delete_object(
        Bucket=source_bucket_name,
        Key=source_folder_name
    )
    
    return {
        'statusCode': 200,
        'body': 'Archivos renombrados, copiados al nuevo bucket y carpeta eliminada exitosamente'
    }

### Una vez que el archivo se encuentra en el bucket que nutre a [AWS Athena](https://aws.amazon.com/es/athena/), podemos realizar consultas SQL para obtener información de los datos almacenados en el `Data Warehouse` como así también utilizar la librería PyAthena para hacer consultas desde Python y así crear aplicaciones o sitios web que utilicen el database para arrojar resultados. 
### Conforme la base de datos de TLC se renueve, nuestro `Data Warehouse` se actualizará automáticamente mediante nuestra función `Lambda` que tomará los nuevos registros de viajes y los almacenará en el `Data Warehouse`.

## **Entrenamiento del Modelo de Machine Learning**

### Para realizar el entrenamiento se optó por adaptar el código creado en local, a las herramientas que nos brinda AWS. Para ello se creó un notebook en [Amazon SageMaker](https://aws.amazon.com/es/sagemaker/) y se utilizó el servicio de [Amazon S3](https://aws.amazon.com/es/s3/) para almacenar los datos de entrenamiento y los modelos generados.

In [None]:
import numpy as np
import pandas as pd
import joblib
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import boto3
import warnings

# Ignorar todas las advertencias
warnings.filterwarnings("ignore")

# Configuración de credenciales para acceder a S3
region_name = 'tu_region'
aws_access_key_id = 'tu_access_key'
aws_secret_access_key = 'tu_secret_key'

# Cambia la ruta de S3 para cargar el DataFrame
s3_path = 's3://tu_bucket/tu_archivo'
df = pd.read_parquet(s3_path)

# Función para agregar la columna 'pBoroughID'
def addBoroughID(df):
    df['pBoroughID'] = df['pickup_borough'].map({'Manhattan': 1, 'Brooklyn': 2, 'Queens': 3, 'Bronx': 4, 'Staten Island': 5}).fillna(6)
    return df

df = addBoroughID(df)

# Obtener mes, día de la semana, hora y día de la semana
df['month'] = df['pickup_datetime'].dt.month
df['dayofweek'] = df['pickup_datetime'].dt.dayofweek
df['hour'] = df['pickup_datetime'].dt.hour

# Agrupamos por localización, hora y día de la semana
df_ml = df.groupby(['hour', 'pBoroughID', 'dayofweek']).size().reset_index(name='demand')

# Creamos el porcentaje de Demanda
df_ml['demand'] = df_ml['demand'].apply(lambda x: (x / df_ml['demand'].max()))
df_ml['demand'] = round(df_ml['demand'] * 100, 2)

# Seleccionar características y variable objetivo
features = ['pBoroughID', 'dayofweek', 'hour']
target = 'demand'

# Plantear las variables dependientes e independientes
X = df_ml[features]
y = df_ml[target]

# Dividir los datos en conjuntos de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Inicializar y entrenar el modelo XGBRegressor
xgb_model = XGBRegressor(objective='reg:squarederror', random_state=42)
xgb_model.fit(X_train, y_train)

# Realizar predicciones en el conjunto de prueba
y_pred = xgb_model.predict(X_test)

# Evaluar el rendimiento del modelo
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)

print(f'Mean Squared Error (MSE): {mse}')
print(f'Mean Absolute Error (MAE): {mae}')
print(f'Root Mean Squared Error (RMSE): {rmse}')
print(f'R-squared (R2): {r2}')

# Guardar el modelo en un archivo pickle
joblib.dump(xgb_model, 'tu_modelo_entrenado')

# Guardar el modelo en S3
s3_model_path = 's3://tu_bucket/tu_carpeta/tu_modelo_entrenado'
boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key).upload_file('tu_modelo_entrenado', 'tu_bucket', 'tu_carpeta/tu_modelo_entrenado')
print('Modelo entrenado y guardado en la ruta s3://tu_bucket/tu_carpeta/tu_modelo_entrenado')