# Tarea de la Semana 4:<br>Construyendo Pipelines de Lotes y de Streaming de principio a fin<br>Basado en los Requerimientos de las Partes Interesadas

En este laboratorio, experimentarás un paso más con el marco de "Pensar como un Ingeniero de Datos". Implementarás las arquitecturas de lotes y de streaming que has explorado en los cuestionarios de esta semana para cumplir con los requerimientos de las partes interesadas. Primero comenzarás implementando el pipeline de lotes que sirve los datos de entrenamiento para el sistema de recomendación. Luego almacenarás los embeddings de salida del modelo en una base de datos vectorial, y finalmente implementarás el pipeline de streaming que utiliza el modelo entrenado y la base de datos vectorial para proporcionar recomendaciones de productos en tiempo real.

# Tabla de Contenidos
- [1 - Implementando el Pipeline de Lotes](#1)
- [2 - Creando y Configurando la Base de Datos Vectorial](#2)
- [3 - Conectando el Modelo Implementado a la Base de Datos Vectorial](#3)
- [4 - Implementando el Pipeline de Streaming](#4)

---


<div id='1'/>

## 1 - Implementación del Pipeline por Lotes

Aquí está el diagrama arquitectónico del pipeline por lotes:

![batch_process](./images/de-c1w4-diagram-batch.drawio.png)

El pipeline por lotes ingiere datos de productos y usuarios desde la base de datos fuente (base de datos MySQL de Amazon RDS), los transforma utilizando AWS Glue ETL y almacena los datos transformados en un bucket de Amazon S3. Los datos se transforman en datos de entrenamiento que serán utilizados por el científico de datos para entrenar el sistema de recomendación.

Antes de implementar este pipeline, explorarás una tabla adicional que se agregó a la base de datos de muestra de MySQL `classicmodels` utilizada en la Semana 2. La tabla adicional está etiquetada como `ratings` y consiste en las calificaciones asignadas por los usuarios a los productos que han comprado. Las calificaciones están en una escala del 1 al 5 y fueron generadas para este laboratorio.

**Explorando la tabla `ratings`**

La base de datos fuente (base de datos MySQL de Amazon RDS) ya está instanciada y provisionada para que la uses en este laboratorio. Conéctate a ella y verifica la tabla `ratings`.


1.1. Obtenga el punto final (es decir, la dirección) de la instancia de base de datos ejecutando el siguiente comando (reemplace `<MySQL-DB-name>` con `de-c1w4-rds`) en la terminal de VSCode. Si es necesario, abra una nueva terminal seleccionando Terminal > Nueva terminal en el menú.

```bash
aws rds describe-db-instances --db-instance-identifier <MySQL-DB-name> --output text --query "DBInstances[].Endpoint.Address"
```


1.2. Conéctese a la base de datos ejecutando el siguiente comando, reemplazando
   - `<MySQLEndpoint>` con la salida del paso anterior,

   - `<DatabaseUserName>` con `admin` ,

   - `<Password>` con `adminpwrd` :

```bash
mysql --host=<MySQLEndpoint> --user=<DatabaseUserName> --password=<Password> --port=3306
```


1.3. Asegúrate de usar la base de datos `classicmodels` y listar las tablas existentes:

```bash
use classicmodels;
show tables;
```

Deberías ver la nueva tabla: `ratings`.


1.4. Para verificar las primeras 20 líneas de la tabla, ejecuta la siguiente consulta:

```sql
SELECT * 
FROM ratings
LIMIT 20;
```

La cláusula `LIMIT` limita la cantidad de filas mostradas en el resultado de la consulta, 
puedes ajustar este número para ver más o menos datos.
    
Intenta comprender la estructura de la tabla `ratings` y dónde debería colocarse en el esquema original 
[esquema](https://www.mysqltutorial.org/getting-started-with-mysql/mysql-sample-database/).


1.5. Cerrar la conexión a la base de datos:

```bash
exit
```

**Ejecución del trabajo de ETL de AWS Glue**

Ahora creará los recursos necesarios para el pipeline por lotes (AWS Glue ETL y
depósito de Amazon S3) utilizando [Terraform](https://www.terraform.io/). Recuerde que Terraform es una herramienta de Infraestructura como Código (IaC)
que le permite configurar y aprovisionar recursos para su flujo de trabajo.
Una vez que haya creado los recursos, ejecutará el trabajo de AWS Glue que ingesta
los datos de RDS y los transforma en los datos de entrenamiento solicitados por el científico de datos.


1.6. Configurar el entorno ejecutando el script `scripts/setup.sh`:

```bash
source ./scripts/setup.sh
```

Este script configura algunas variables de entorno necesarias para pasar parámetros a la configuración de Terraform.


1.7. Ir a la carpeta `terraform`.

```bash
cd terraform
```


1.8. Abre el script de Terraform `terraform/main.tf`. Para la parte del pipeline por lotes, solo tienes que trabajar en `module "etl"`. Descomenta la sección correspondiente del archivo (líneas 1 a 15), manteniendo el resto del archivo comentado (los comentarios en Terraform comienzan con `#`).

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.


1.9. Abre el script `terraform/outputs.tf` y descomenta las líneas correspondientes a la sección `ETL` (líneas 2 a 8).

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.


1.10. Inicializa la configuración de Terraform:

```bash
terraform init
```

1.11. Para implementar los recursos, ejecuta los siguientes comandos:

```bash
terraform plan
terraform apply
```

*Nota*: El comando `terraform apply` te pedirá que respondas `yes` aquí:

![terraform_plan](images/terraform_plan.png)

*Nota*: Si hay errores en los comandos o en los archivos de configuración de Terraform, la terminal puede colapsar. 
Cuando esto suceda, verás el siguiente mensaje:

![etl_diagram](images/terminal_crash.png)

Puedes volver a abrir la terminal presionando <code>Ctrl + \`</code> (o <code>Cmd + \`</code>) o navegando a Ver > Terminal. 
En la terminal, vuelve a la carpeta de Terraform (`cd terraform`) y luego intenta 
ejecutar nuevamente los comandos requeridos. El error debería aparecer ahora en la terminal.
Si la terminal sigue colapsando, ejecuta en su lugar el siguiente comando:
`terraform apply -no-color  2> errors.txt`
Esto creará un archivo de texto que contiene el mensaje de error sin hacer que la terminal colapse.


1.12. Para acceder a la consola de AWS, ejecuta el siguiente comando en la terminal.

```bash
cat ~/.aws/aws_console_url
```
Abre el enlace en una nueva ventana del navegador.

*Nota*: Por razones de seguridad, la URL para acceder a la consola de AWS caducará cada 15 minutos, 
pero cualquier recurso de AWS que hayas creado seguirá estando disponible durante un período de 2 horas. 
Si necesitas acceder a la consola después de 15 minutos, vuelve a ejecutar el comando para obtener un nuevo enlace activo.

*Nota:* Si ves la ventana como en la siguiente captura de pantalla, haz clic en el enlace **logout**, 
cierra la ventana y vuelve a hacer clic en el enlace de la consola.

![AWSLogout](images/AWSLogout.png)


1.13. En la consola de AWS, busca **AWS Glue** e ingresa al enlace de **trabajos ETL** en el panel izquierdo. Deberías ver un trabajo creado con el nombre `de-c1w4-etl-job`. Para iniciar el trabajo de AWS Glue, ejecuta el siguiente comando:

```bash
aws glue start-job-run --job-name de-c1w4-etl-job | jq -r '.JobRunId'
```

Deberías obtener `JobRunID` en la salida.


1.14. Verifique el estado del trabajo de AWS Glue intercambiando `<JobRunID>` con la salida del paso anterior:

```bash
aws glue get-job-run --job-name de-c1w4-etl-job --run-id <JobRunID> --output text --query "JobRun.JobRunState"
```

También puede ver el estado del trabajo en la consola abriendo `de-c1w4-etl-job` y yendo a la pestaña `Runs`. Espere hasta que el estado del trabajo cambie a `SUCCEEDED` (tomará 2-3 minutos).

Los datos transformados tienen el siguiente esquema:

![schema_after_ETL](./images/schema_after_ETL.png)


1.15. El trabajo de AWS Glue debe transformar los datos y almacenarlos en el bucket del lago de datos S3 que se creó con IaC. Puedes ir a la consola de AWS, buscar **S3** y luego buscar un bucket llamado `de-c1w4-<PLACEHOLDER>-datalake`, donde `<PLACEHOLDER>` es el número de tu cuenta de AWS. Bajo la carpeta `ratings-ml-training` puedes ver algunas carpetas adicionales con la convención de nomenclatura: `customerNumber=<NUMBER>`. Esta convención indica cómo se particionaron los datos durante el paso de almacenamiento. El concepto de particionamiento se abordará más adelante en la especialización.

Una vez que los datos estén allí, el equipo de ML tomará estos datos y entrenará su modelo.


<div id='2'/>

## 2 - Creación y configuración de la base de datos de vectores

2.1. Ahora, supongamos que el Científico de Datos ha entrenado el modelo. Crearon el bucket S3
llamado `de-c1w4-<AWS-ACCOUNT-ID>-us-east-1-ml-artifacts`. En ese bucket, el Científico de Datos
publicó los resultados del modelo. Puedes explorar el bucket y encontrar la siguiente
estructura de carpetas:

```bash
.
├── embeddings/
|   ├── item_embeddings.csv
|   └── user_embeddings.csv
├── model/
|   └── best_model.pth   
└── scalers/
    ├── item_ohe.pkl
    ├── item_std_scaler.pkl
    ├── user_ohe.pkl
    └── user_std_scaler.pkl   
```

La carpeta `embeddings` contiene los embeddings de los usuarios y elementos (o productos) que
fueron generados en el modelo.

La carpeta `model` contiene el modelo entrenado que se utilizará para inferencia.

La carpeta `scalers` contiene los objetos utilizados en la parte de preprocesamiento para
el entrenamiento, como 
[One Hot Encoders](https://hackernoon.com/what-is-one-hot-encoding-why-and-when-do-you-have-to-use-it-e3c6186d008f) 
y [Standard Scalers](https://en.wikipedia.org/wiki/Feature_scaling).

El Científico de Datos pidió subir los archivos `item_embeddings.csv` y `user_embeddings.csv`
a una base de datos de vectores (Vector DB). Esos embeddings serán utilizados por el
sistema de recomendación en el pipeline de streaming para proporcionar recomendaciones de productos.
El uso de una base de datos de vectores acelera la recuperación de elementos que son similares a un
elemento dado. Entonces, por ejemplo, si un usuario coloca un artículo en el carrito, el modelo de recomendación
primero calculará el vector de embedding de este artículo y luego usará la base de datos de vectores
para recuperar elementos similares a él.

**Creación de la base de datos de vectores**

Para la base de datos de vectores, crearás con Terraform una base de datos RDS PostgreSQL
con la [extensión](https://github.com/pgvector/pgvector) `pgvector`. La base de datos PostgreSQL
es típicamente más rápida para consultas complejas y análisis de datos, mientras que MySQL,
que utilizaste anteriormente, es más eficiente para consultas más simples. Trabajar con Vector
DB PostgreSQL proporciona más capacidades y flexibilidad.


2.2. En el explorador de VSCode, abre nuevamente el script `/terraform/main.tf`. Descomenta la sección `module "vector_db"` (líneas 17 a 27).

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.

2.3. En el archivo `/terraform/outputs.tf`, descomenta los bloques asociados con la base de datos Vector (líneas 11 a 27).

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.

2.4. Reinicializa el módulo de terraform, planifica y aplica los cambios con los siguientes comandos (asegúrate de ejecutarlos desde la carpeta `terraform` en la terminal):

```bash
terraform init
terraform plan
terraform apply
```

*Nota*: El comando `terraform apply` te pedirá que respondas `yes` aquí:

![terraform_plan](images/terraform_plan_2.png)

*La creación de la base de datos tomará alrededor de 7 minutos*.


2.5. Verás que se muestra cierta información después del comando `terraform apply`, pero algunos campos (nombre de usuario y contraseña de la base de datos) aparecen como sensibles. Para ver esa información, que se utilizará para conectarse a la base de datos Vector, necesitas usar los siguientes comandos:

```bash
terraform output vector_db_master_username
terraform output vector_db_master_password
```

Guarda las salidas en algún lugar localmente, las usarás más tarde.

*Nota*: Las salidas se imprimen entre comillas dobles, las cuales no forman parte del nombre de usuario o la contraseña.

**Agregando los embeddings a la base de datos vector**

Ahora que se ha creado la base de datos vector, te conectarás a ella para importar los embeddings desde S3. El archivo `sql/embeddings.sql` contiene las declaraciones SQL para esta tarea.


2.6. Abre el archivo `sql/embeddings.sql` y cambia los marcadores de posición del bucket `<BUCKET_NAME>` con el nombre del bucket `de-c1w4-<AWS-ACCOUNT-ID>-us-east-1-ml-artifacts` (donde `<AWS-ACCOUNT-ID>` es tu ID de cuenta de AWS). Puedes ir a la consola de AWS, buscar S3 y luego buscar un bucket llamado `de-c1w4-<AWS-ACCOUNT-ID>-us-east-1-ml-artifacts`. Otra opción es ejecutar el comando `aws s3 ls` en la terminal.

Puedes regresar al paso 2.1 para obtener el nombre del bucket que contiene los embeddings.

*Nota*: ¡El cambio de los marcadores de posición del bucket `<BUCKET_NAME>` debe hacerse en dos lugares!

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.


2.7. Obtén la salida de `vector_db_host` de Terraform. Puedes ver la salida que se devolvió al final del paso 2.4 o ejecutar el comando `terraform output vector_db_host`.

Más adelante usarás esta salida, así que guárdala en algún lugar local.

2.8. Ahora conecta a la base de datos ejecutando el siguiente comando, reemplazando `<VectorDBHost>` con la salida del paso anterior. Se te pedirá una contraseña, puedes usar la obtenida en el paso 2.5.

```bash
psql --host=<VectorDBHost> --username=postgres --password --port=5432
```

2.9. Luego, para trabajar en la base de datos de postgres usa este comando con la misma contraseña:

```bash
\c postgres;
```

2.10. Una vez conectado a la base de datos de postgres, ahora puedes ejecutar las declaraciones SQL en el script `sql/embeddings.sql`. Usa el siguiente comando:

```bash
\i '../sql/embeddings.sql'
```


2.11. Para verificar las tablas disponibles, usa el siguiente comando:

```bash
\dt *.*
```
Presiona la tecla `Q` para salir del prompt de `psql`.

2.12. Sal del prompt de `psql` con el comando `\q` (o `exit`).

*Opcional*: Si estás interesado en aprender más sobre las banderas de postgres, puedes consultar [link](https://hasura.io/blog/top-psql-commands-and-flags-you-need-to-know-postgresql/).


<div id='3'/>

## 3 - Conectando el Modelo Implementado a la Base de Datos de Vectores

Ahora que tienes la base de datos de vectores creada y los embeddings importados en ella. Veamos dónde encaja en el flujo de trabajo de transmisión. Aquí está el diagrama arquitectónico del flujo de trabajo de transmisión:

![stream_process](./images/de-c1w4-diagram-stream.drawio.png)

En el lado izquierdo, ves una función lambda etiquetada como "inferencia de modelo". Esta función lambda, que es un recurso informático sin servidor, utilizará el modelo entrenado almacenado en S3 y los embeddings de la base de datos de vectores para proporcionar la recomendación basada en la actividad en línea del usuario transmitida por los flujos de datos de Kinesis.

En esta sección, configurarás las variables en la función lambda para prepararla para el flujo de trabajo de transmisión.

3.1. En la Consola de AWS, busca **RDS**, haz clic en **Bases de datos** en el panel izquierdo y encuentra la base de datos llamada `de-c1w4-vector-db`. Haz clic en ella.

3.2. En la pestaña *Conectividad y seguridad*, busca el endpoint, que debería tener la siguiente estructura: `de-c1w4-vector-db.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com`. Copia este valor y guárdalo en un lugar seguro, ya que lo usarás en un momento (este es el mismo endpoint que obtuviste con el comando `terraform output vector_db_host`).

3.3. De regreso a la Consola de AWS, busca *Lambda*. Luego encuentra la función lambda `de-c1w4-model-inference`. Haz clic en el nombre de la lambda y luego desplázate hacia abajo y busca la pestaña `Configuración`, haz clic en ella. Luego, abre Variables de entorno desde el panel izquierdo y haz clic en `Editar`.

Coloca los valores para las siguientes variables:
   - `VECTOR_DB_HOST`:  Endpoint de VectorDB que copiaste en el paso anterior,

   - `VECTOR_DB_PASSWORD`: valor de salida del comando `terraform output vector_db_master_password`,
   
   - `VECTOR_DB_USER`: valor de salida del comando `terraform output vector_db_master_username`.

Haz clic en `Guardar`.


## 4 - Implementando el Pipeline de Streaming

Ahora implementarás el pipeline de streaming que consta de Kinesis Data Streams, Kinesis Firehose y S3 (bucket de recomendaciones). Supongamos que AWS Kinesis Data Streams recibe la actividad en línea de los usuarios desde el registro de la plataforma de ventas. Luego transmite estos datos a Kinesis Data Firehose, que actúa como un servicio de entrega que se encarga de cargar los flujos de datos en S3 (bucket de recomendaciones). Antes de cargar los datos en S3, invoca la función lambda (transformación de streaming) que extrae las características de usuario y producto de los flujos de datos y utiliza el modelo recomendador entrenado para encontrar las recomendaciones y finalmente carga las recomendaciones en el bucket de recomendaciones de S3.

Ahora usa Terraform para crear Kinesis Firehose, el bucket de recomendaciones de S3 y la función lambda (transformación de streaming).


4.1. En el explorador de VSCode, abre nuevamente el script `/terraform/main.tf`, descomenta la sección `module "streaming_inference"` (líneas 29 a 39).

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.

4.2. En el archivo `/terraform/outputs.tf`, descomenta los bloques asociados con la inferencia de streaming (líneas 30 a 32).

*Nota*: Recuerda guardar tus cambios presionando `Ctrl+S` o `Cmd+S`.

4.3. Reinicializa el módulo de terraform, planifica y aplica los cambios (asegúrate de ejecutar los comandos desde la carpeta `terraform` en la terminal):

```bash
terraform init
terraform plan
terraform apply
```

*Nota*: El comando `terraform apply` te pedirá que respondas `yes`.


4.4.  Ahora, de regreso en la Consola de AWS, busca **S3** y haz clic en el bucket de recomendaciones que se acaba de crear usando Terraform, llamado `de-c1w4-<PLACEHOLDER>-recommendations`. Después de un tiempo, el flujo de entrega de Kinesis que creaste con Firehose comenzará a consumir datos del flujo de datos de Kinesis `de-c1w4-kinesis-data-stream`. Luego, realizará las transformaciones que configuraste en la función Lambda y el bucket de S3 se llenará con algunos archivos. También puedes ir a Lambda y buscar `de-c1w4-transformation-lambda` que fue creado con Terraform. Haz clic en él y busca la pestaña `Monitor`. Haz clic en `Ver registros de CloudWatch`.

*Nota*: Puede que veas un mensaje de error en la parte superior de la página *El grupo de registros no existe.* *El grupo de registros específico: /aws/lambda/de-c1w4-transformation-lambda no existe en esta cuenta o región.* Si esperas un minuto aproximadamente y luego actualizas, ese mensaje de error desaparecerá y deberías comenzar a ver las actividades de registro.

Allí podrás ver los registros de la función mientras realiza algunas transformaciones. A medida que `de-c1w4-kinesis-data-stream` recibe datos dinámicamente (con un tiempo promedio entre eventos de 10 segundos), el bucket de S3 o los registros de Lambda pueden tardar algunos minutos en comenzar a mostrar datos o eventos. Puedes esperar alrededor de 5 minutos y actualizar la página de S3/CloudWatch Logs para ver los datos entrantes.
En el bucket de S3, los datos se particionarán por fecha, por lo que verás una jerarquía de directorios similar a la siguiente:

```bash
.
├── año/
    └── mes/
        └── día/
            └── hora/
                └── de-c1w4-delivery-stream-<PLACEHOLDER>
```

En este laboratorio, pasaste por un pipeline de extremo a extremo tanto para casos de lotes como de streaming. Construiste un pipeline de datos por lotes con AWS Glue y S3 para proporcionar los datos de entrenamiento para el sistema de recomendación, luego creaste una base de datos de vectores para almacenar los embeddings y finalmente construiste un pipeline de streaming en tiempo real utilizando AWS Kinesis Data Streams y Firehose. Y con eso, has visto cómo traducir las necesidades de las partes interesadas en requisitos funcionales y no funcionales, elegir las herramientas apropiadas y luego construir el pipeline de datos.
