### En este notebook encontrará los siguientes puntos:

Etapas del ETL:

1. Recepción de archivos CSV (AWS Lambda + AWS S3 -or- Databricks Job + AWS S3)
2. Carga de archivos CSV (Spark)
3. Manejo de errores y filtrado de registros incompletos (Spark + AWS SES)
4. Consulta de la API postcodes.io (Python)
5. Almacenamiento de códigos postales (MongoDB)
6. Respaldo de archivos CSV en Data Lake (AWS S3)

Adicional:

- Pruebas unitarias
- Control de Versiones

In [0]:
import json
import boto3
import smtplib
import requests
import urllib.parse
from math import ceil
from pathlib import Path
from datetime import date
from pymongo import MongoClient
from pyspark.sql import SparkSession
from email.mime.text import MIMEText
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from email.mime.multipart import MIMEMultipart
from pyspark.sql.functions import lit, row_number, col

### 1. Recepción de archivos

Para la recepción de archivos se me ocurren dos soluciones distintas, las cuales comentaré a continuación:

**Opción 1 (AWS S3 + AWS Lambda):**

Esta opción es la indicada si requieren cargarse archivos de forma continua al sistema. La idea consiste en, configurar un trigger que ejecute la función lambda, cada vez que se suba un CSV a un bucket S3.

Las funciones lambda escalan automáticamente recursos, por lo que, la disponibilidad del sistema no se vería comprometida ante un aumento en el número de archivos a procesar. Además, tanto las funciones lambda como los buckets S3 son recursos de muy bajo costo monetario.

**Opción 2 (AWS S3 + Job Databricks):**

Esta sería la opción indicada si requieren cargarse archivos con muchos registros (mayores al millón), y si la carga continua de datos al warehouse no es una necesidad.

Databricks ejecuta Spark de forma nativa, además de poseer otras características que lo vuelven una gran herramienta para procesamientos de grandes volúmenes de datos. Sin embargo, si se requiriera una solución que esté ejecutándose de forma continua, sería necesario tener un cluster específico para dicha solución, lo cual elevaría los constos. 

Otro detalle a tomar en cuenta, es que no vale la pena contratar Databricks para la ejecución de un sólo servicio. Databricks es una plataforma para la integración de todos los procesos de un área de Analítica. Sin embargo, otra gran ventaja es que, si ya se cuenta con la herramienta, no hace falta realizar configuraciones adicionales, a diferencia de la opción 1 con la función lambda.

----------------------------------------------------------------------------------------------

Para este ejercicio, utilizaré Databricks por comodidad, sin embargo, para elegir la infraestructura adecuada, es necesario un análisis detallado de las necesidades de un proyecto.

A continuación se presentan dos diagramas, correspondientes a las opciones de recepción de archivos explicadas arriba.

Opción 1:

<img src="./Imagenes/Arquitectura/opcion1.png" width="500" height="500">

<br>

1. Se sube un CSV al bucket de S3.
2. Se “dispara” una función lambda, la cuál realiza limpieza sobre los registros del CSV, además de nutrir la data con información adicional de postcodes.io.
3. Se respalda la data cruda (CSVs) en un bucket distinto de S3.
4. Se carga la información corregida y aumentada al Warehouse en MongoDB.

Opción 2:

<img src="./Imagenes/Arquitectura/opcion2.png" width="500" height="500">

<br>

1. Databricks consulta el bucket de S3 al que se suben los CSVs crudos.
2. Se activa una función lambda, la cuál realiza limpieza sobre los registros del CSV, además de nutrir la data con información adicional de postcodes.io.
3. Se respalda la data cruda (CSVs) en un bucket distinto de S3.
4. Se carga la información corregida y aumentada al Warehouse en MongoDB.

**Nota 1:** Ya que esta etapa de la solución no requiere escribir código, y está más relacionada con el levantamiento de infraestructura, más que adjuntar código redacto una breve explicación respecto a las herramientas seleccionadas.

**Nota 2:** Ya que pueden surgir problemas al tratar de desplegar imágenes en los notebook, también anexo un PDF al repositorio en GitHub, en el que muestro los diagramas de las 2 arquitecturas propuestas.

### 2. Carga de archivos

Para la carga de datos utilizaría Spark, ya que el archivo de prueba tiene más de 2M de registros, así que asumíré que se estarán cargando grandes volúmenes de datos al sistema.

- **Nota 1:** Para este ejercicio me encuentro utilizando la plataforma Databricks, la cual ejecuta Spark de de forma nativa, por esa razón, puedo utilizar el comando spark.read() sin necesidad de crear una nueva sesión de Spark.

- **Nota 2:** Databricks se encuentra asociado a una cuenta AWS, en la cual se encuentra el bucket al que estoy cargando los CSVs. Debido a esto, puedo hacer la lectura de los archivos simplemente indicando la dirección de dicho bucket.

In [0]:
# ------------------------------------------------------------------------------------- #
#                         Función para el envío de correos                              #
# ------------------------------------------------------------------------------------- #

# -- Llaves de acceso a SES

USERNAME_SMTP = '...'
PASSWORD_SMTP = '...'
host = '...'
port = '...'

# -- Variables genéricas opara el envío de los correos de error

SENDERNAME = "..."
RECIPIENT = "..."
COPY= ""
SUBJECT = "Error en CSV"

# -- Función para el envío de correos

def send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML):

    # Create message container - the correct MIME type is multipart/alternative.
    msg = MIMEMultipart('alternative')
    msg['Subject'] = SUBJECT
    msg['From'] = SENDERNAME
    msg['To'] = RECIPIENT
    msg["Cc"] = COPY

    # Record the MIME types of both parts - text/plain and text/html.
    text = MIMEText(BODY_HTML, 'html')

    # Attach parts into message container.
    # According to RFC 2046, the last part of a multipart message, in this case
    # the HTML message, is best and preferred.
    msg.attach(text)
 
    server = smtplib.SMTP(host, port)
    server.ehlo()
    server.starttls()
    #stmplib docs recommend calling ehlo() before & after starttls()
    server.ehlo()
    server.login(USERNAME_SMTP, PASSWORD_SMTP)
    server.sendmail(msg["From"], msg["To"].split(",") + msg["Cc"].split(","), msg.as_string())
    #https://www.tfzx.net/article/20923.html
    server.close()

In [0]:
# ------------------------------------------------------------------------------------- #
#                Se cargan los CSVs en AWS S3 en un Dataframe de Spark                  #
# ------------------------------------------------------------------------------------- #

# spark = SparkSession.builder.appName("post_codes").getOrCreate()

try:
    df = spark.read.format('csv').options(header='true', inferSchema='true').load('s3:/.../pruebas_locas/*.csv')
except:
    BODY_HTML = """Ocurrió un error en la ejecución del ETL, esto puede deberse a los siguientes problemas:
                 <br>     - No se han subido nuevos archivos al sistema.
                 <br>     - Los archivos subidos al sistema están corruptos.
                 <br>     - Los archivos subidos al sistema no son archivos csv.
                 <br>Fecha: {}""".format(date.today().strftime('%Y-%m-%d'))
    send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)
    dbutils.notebook.exit("Ocurrió un error en la ejecución del ETL.")

### 3. Manejo de errores y filtrado de registros incompletos

Se manejan 3 tipos de errores distintos: 

  - CSVs vacíos
  - CSVs con columnas incorrectas
  - CSV con valores no numéricos
  
Cualquiera de esos errores desencadena la finalización del job, y la notificación del archivo "defectuoso" a los correos seleccionados.

Para esta etapa, opté por enviar una notificación por correo, ya que he notado que resulta más efectivo notificar de errores de esta forma. Para esta notificación, utilicé el Simple Email Service (SES) de AWS. En caso de utilizar lambdas en vez de Databricks, podría utilizarse el Simple Notification Service (SNS), lo cual facilitaría el envío a múltiples destinatarios.

**Nota:** Tuve que definir la función de envío de correos en el paso anterior, ya que puede haber errores en la carga de CSVs con Spark.

In [0]:
# ------------------------------------------------------------------------------------- #
# Si no se han cargado nuevos archivos al bucket (o si el CSV está vacío), el job se    #
# detiene para evitar errores                                                           #
# ------------------------------------------------------------------------------------- #

if (df.rdd.isEmpty()):
    BODY_HTML = """Los archivos subidos al sistema se encuentran vacíos
                 <br>Fecha: {}""".format(date.today().strftime('%Y-%m-%d'))
    send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)
    dbutils.notebook.exit("Los archivos subidos al sistema se encuentran vacíos")

In [0]:
# ------------------------------------------------------------------------------------- #
#   Si se han agregado CSVs con columnas distintas a las esperadas, el job se detiene   #
# ------------------------------------------------------------------------------------- #

if (not all(elem in ['lat', 'lon'] for elem in df.columns)):
    BODY_HTML = """Los archivos subidos al sistema tienen columnas incorrectas
                 <br>Fecha: {}""".format(date.today().strftime('%Y-%m-%d'))
    send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)
    dbutils.notebook.exit("Los archivos subidos al sistema tienen columnas incorrectas")

In [0]:
# ------------------------------------------------------------------------------------- #
#        Si el CSV contiene valores no numéricos (o vacíos), el job se detiene          #
# ------------------------------------------------------------------------------------- #

tmp = df.select(
  F.col("lat").cast("int").isNotNull().alias("lat"),
  F.col("lon").cast("int").isNotNull().alias("lon")
)

if(tmp.filter(tmp.lat == False).count() > 0 or tmp.filter(tmp.lon == False).count() > 0):
    BODY_HTML = """Los archivos subidos al sistema contienen registros con valores no numéricos (o vacíos)
                 <br>Fecha: {}""".format(date.today().strftime('%Y-%m-%d'))
    send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)
    dbutils.notebook.exit("Los archivos subidos al sistema contienen registros con valores no numéricos (o vacíos)")

# https://www.py4u.net/discuss/192652

**Nota:** El manejo de errores podría realizarse de una forma mucho más sofisticada, indicando la línea exacta del csv en la que se presenta el error. Por cuestiones de tiempo no implementaré dicha estrategia.

### 4. Consulta de la API

Ya que las llamadas a la API son limitadas, estas se haran por "batches" de coordenadas. El número de llamadas está definido en el código en la variable "llamadas_api". Desconozco el límite de solicitudes que se pueden realizar a la API, así que, para este ejercicio, asigné dicho número de forma arbitraria.

**Nota:** Tras estar jugando con la API, descubrí que esta tiene un límite de 100 coordenadas por petición, debido a esto, las pruebas realizadas a partir de aquí conllevan batches no mayores a ese número.

In [0]:
# -------------------------------------------------------------------------------------- #
# Se renombran las columnas "lat" y "lon" de acuerdo a los requerimientos de la API      #
# (postcodes.ip), además, se agrega una columna "limit", para que las peticiones a la    #
# API sólo devuelvan 1 resultado por coordenada geográfica.                              #
# -------------------------------------------------------------------------------------- #

df = df.withColumnRenamed("lat","latitude") \
        .withColumnRenamed("lon","longitude") \
        .withColumn('limit', lit(1))

In [0]:
# -------------------------------------------------------------------------------------- #
# Se agrega un "índice" al dataframe de Spark, para poder tomar "batches" de coordenadas #
# -------------------------------------------------------------------------------------- #

w = Window().partitionBy(lit('a')).orderBy(lit('a'))
df_indexed = df.withColumn("index", row_number().over(w))

# Para poder seleccionar un rango de filas del dataframe, se requiere crear una columna que funja de indice: 
# https://stackoverflow.com/questions/55690705/how-to-select-a-range-of-rows-from-a-dataframe-in-pyspark
# https://stackoverflow.com/questions/53042432/creating-a-row-number-of-each-row-in-pyspark-dataframe-using-row-number-functi

In [0]:
# -------------------------------------------------------------------------------------- #
#  Se calcula el número de pares de coordenadas que tendrá cada batch en las peticiones  #
# -------------------------------------------------------------------------------------- #

llamadas_api = 5
batch_size = ceil(df.count()/llamadas_api)

if (batch_size > 100):
    BODY_HTML = """Los batches no pueden ser mayores a 100 coordenadas, debido a restricciones de la API
                 <br>Fecha: {}""".format(date.today().strftime('%Y-%m-%d'))
    send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)
    dbutils.notebook.exit("Los batches no pueden ser mayores a 100 coordenadas, debido a restricciones de la API")

In [0]:
# -------------------------------------------------------------------------------------- #
# Se van obteniendo los batches de coordenadas mediante un ciclo for() y, a la vez, se   #
# castean a formato JSON, para poder realizar peticiones de batches a la API.            #
#                                                                                        #
# Adicional, se agrega cada batch como elemento de la key "geolocations", que es el      #
# campo que utiliza la API para recibir batches de coordenadas.                          #
#                                                                                        #
# Al final de esta celda, se obtiene un arreglo de JSONS: [{"geolocations" : [batch_1]}, #
#                                                          {"geolocations" : [batch_2]}, #
#                                                          .                             #
#                                                          .                             #
#                                                          .                             #
#                                                          {"geolocations" : [batch_n]}] #
# -------------------------------------------------------------------------------------- #

batches = []

for i in range(llamadas_api):
    down_index = batch_size*i+1
    up_index = batch_size*(i+1)
    batches = batches + [{"geolocations": [json.loads(row) for row in df_indexed.filter(col("index") \
                                                                                .between(down_index, up_index)) \
                                                                                .select("latitude", "longitude", "limit") \
                                                                                .toJSON().collect()]}]

In [0]:
# -------------------------------------------------------------------------------------- #
#                 Se manda llamar a la API por cada batch de coordenadas                 #
# -------------------------------------------------------------------------------------- #

responses = []

for batch in batches:
    headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
    response = requests.post('https://api.postcodes.io/postcodes/', json=batch, headers=headers).json()
    
    if (response['status'] != 200):
        BODY_HTML = """Ocurrió un error desconocido al tratar de envíar un batch de coordenadas a la API.
                   Favor de contactar con el desarrollador.
                   <br>Fecha: {}""".format(date.today().strftime('%Y-%m-%d'))
        send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)
        dbutils.notebook.exit("Ocurrió un error desconocido al tratar de envíar un batch de coordenadas a la API.")
        
        responses = responses + response['result']

In [0]:
# -------------------------------------------------------------------------------------- #
# Se recorre la lista con todas las respuestas devueltas por la API, si el resultado es  #
# nulo (null), las coordenadas se agrega a una lista que serán reportadas por correo, en #
# caso contrario, el resultado se agrega a la lista de respuestas que serán agregadas a  #
# la BD.                                                                                 #
# -------------------------------------------------------------------------------------- #

to_report = []
to_bd = []

for response in responses:
    if(response['result'] == None):
        to_report = to_report + [[response['query']['latitude'], response['query']['longitude']]]
    else:
        to_bd = to_bd + response['result']

In [0]:
# -------------------------------------------------------------------------------------- #
# Los pares (latitude, longitude) sin un código postal asociado son enviadas por correo. #
# -------------------------------------------------------------------------------------- #

to_report = "<br>".join(', '.join(str(y) for y in x) for x in to_report)

BODY_HTML = """Se han subido coordenadas (latitude, longitude) que no empatan con ningún código postal.
               <br>
               <br>A continuación la lista:
               <br>
               <br>{1}
               <br>
               <br>
               <br>Fecha: {0}""".format(date.today().strftime('%Y-%m-%d'), to_report)
send(SENDERNAME, RECIPIENT, COPY, SUBJECT, BODY_HTML)

**Nota:** Se podría desarrollar un proceso más sofisticado, el cual identifique las coordenadas que no empataron con un código postal y, posteriormente, indique dichas coordenadas al usuario final. Sin embargo, por cuestiones de tiempo no implementaré dicha lógica.

### 5. Almacenamiento de códigos postales en BD

El resultado devuelto por la API poscodes.io es un arreglo de JSONs, debido a esto, dicha respuesta puede ser almacenada en MongoDB sin necesidad de realizar procesamiento adicional.

Por comodidad, elegí MongoDB para el almacenamiento de CPs, sin embargo, dependiendo del uso que se le fuera a dar a la data, podría optar por una base optimizada para análisis (p.e. Redshift).

In [0]:
user = '...'
password = '...'
db = '...'
host = '...'

mongo_uri = "mongodb+srv://" + urllib.parse.quote_plus(user) + ":" + password + "@" + host + "/" + db + "?retryWrites=true&w=majority&ssl=true&ssl_cert_reqs=CERT_NONE"

client = MongoClient(mongo_uri)
mydb = client[db]

# Se obtiene la colección

mycol = mydb['postal_codes']

# Se inserta la data 

mycol.insert_many(to_bd)

**Nota:** Haría falta una validación, para verificar que los pares "longitud, latitud" no hayan sido previamente ingresados a la BD, sin emabrgo, por razones de tiempo y que no es un requerimiento de este ejercicio, no realizaré dicha validación.

### 6. Respaldo de archivos en Data Lake

Para conservar el archivo original (.csv), opté por crear un nuevo directorio en el bucket S3. Además, agrego la fecha al nombre del archivo, para poder diferenciar archivos en caso de que se suban dos o más con el mismo nombre.

**Nota 1:** Esta parte del ETL podría realizarse sin asumir un role mediante el cliente STS. Tuve que realizarlo de esta forma debido a configuración de mi usuario de AWS.

**Nota 2:** Esta etapa de la ejecución debe dejarse al final del ETL, esto debido a la "lazy evaluation" de Spark. Spark no procesa la data hasta que se le es indicado mediante instrucciones, por consiguiente, si la data cruda es borrada o actualizada antes de enviar dicha orden a Spark, se generará un error en la ejecución.

In [0]:
# ------------------------------------------------------------------------------------- #
#        Se asume un role iam para poder utilizar ciertas funcionalidades de S3         #
# ------------------------------------------------------------------------------------- #

rol_s3 = "arn:aws:iam::...:role/databricks-s3"

sts_client = boto3.client('sts')

assumed_role_object = sts_client.assume_role (RoleArn = rol_s3,
                                              RoleSessionName = "AssumeRoleSession1")
    
credentials = assumed_role_object['Credentials']
s3_resource = boto3.resource('s3',
                            aws_access_key_id=credentials['AccessKeyId'],
                            aws_secret_access_key= credentials ['SecretAccessKey'],
                            aws_session_token = credentials['SessionToken'],)

# ------------------------------------------------------------------------------------- #
#                              Se accede al bucket deseado                              #
# ------------------------------------------------------------------------------------- #

bucket = "..."

my_bucket = s3_resource.Bucket(bucket)

# ------------------------------------------------------------------------------------- #
# Se copian todos los CSV (archivos crudos) a otra dirección en bucket S3, y se         #
# eliminan del bucket actual.                                                           #
# ------------------------------------------------------------------------------------- #

fecha = date.today().strftime('%Y-%m-%d')

for obj in my_bucket.objects.filter(Delimiter='/', Prefix='pruebas_locas/'):
    if obj.key.endswith('.csv'):
        copy_source = {'Bucket': bucket,
                   'Key': obj.key}
        new_file = obj.key.split('/')[0] + "/data_lake/" + obj.key.split('/')[1].split('.')[0] + "_" + fecha + '.csv'
        s3_resource.meta.client.copy(copy_source, bucket, new_file)
        s3_resource.Object(bucket, obj.key).delete()

### Pruebas unitarias

Para este ejercicio, más que realizar pruebas unitarias, realicé pruebas de "semi-integración"; llamaré a las pruebas de esta forma ya que, para validar el funcionamiento de alguna sección de la etapa 3 del ETL (p.e. la condición que valida que los csv sólo contengan valores numéricos), deben ejecutarse primero las etapas 1 y 2, para validar alguna sección de la etapa 4, deben ejecutarse primero las etapas 1, 2 y 3, y así sucesivamente.

Realicé pruebas de semi-integración porque las considero mucho más "ágiles" que las tradicionales pruebas unitarias. Además, son un tipo de pruebas que se vuelven fáciles de realizar gracias a los notebooks de código, a diferencia de los tradicionales entornos de desarrollo que no pueden ejecutar secciones de código de manera independiente.

A continuación, se enlistan las pruebas realizadas en cada etapa del ETL,

**Etapa 1:**

N/A

**Etapa 2:**
- Ejecución del ETL sin haber subido CSVs a AWS (revisar /Imagenes/Pruebas/prueba1.png)
- Ejecución del ETL tras haber subido TXTs a AWS (revisar /Imagenes/Pruebas/prueba2.png)

**Etapa 3:**

- Carga de 4 CSVs en un dataframe de Spark (revisar /Imagenes/Pruebas/prueba0.png)
- Carga de un CSV vacío (revisar /Imagenes/Pruebas/prueba3.png)
- Carga de un CSV sin encabezados (revisar /Imagenes/Pruebas/prueba4.png)
- Carga de un CSV con encabezados incorrectos (revisar /Imagenes/Pruebas/prueba5.png)
- Carga de un CSV con un registro incompleto (sin longitud o latitud) (revisar /Imagenes/Pruebas/prueba6.png)

**Etapa 4:**

- Asignar la variable "llamadas_api" de tal forma que el tamaño de los batches sea mayor a 100 (revisar /Imagenes/Pruebas/prueba7.png)
- Cargar de coordenadas "no asociables" a un código postal (revisar /Imagenes/Pruebas/prueba8.png)

**Etapa 5:**

No pude pensar en alguna prueba para esta etapa, sólo verifiqué la correcta carga de los registros a la BD (dicha carga se puede corroborar en la imagen anexa "prueba9.png").

**Etapa 6:**

- Carga de archivos no .csv al bucket S3 de AWS (revisar /Imagenes/Pruebas/prueba10.png)

**Nota:** Además de las pruebas mencionadas arriba, se estuvo validando el funcionamiento correcto de cada celda del notebook durante todo el desarrollo del ejercicio.

### Control de Versiones

Debido a que la plataforma que utilicé (Databricks) cuenta con su propio Control de Versiones, fue que no utilicé algún repositorio tipo GitLab o GitHub para llevar dicho control, sin embargo, conozco perfectamente dichas herramientas, y no tengo problema alguno con los conceptos e instrucciones de Push y Pull.

**Nota al pie 1:** Por cuestiones de privacidad, no comparto las llaves para acceder a mis cuentas de AWS ni de MongoDB, así como nombres de buckets S3. Para poder ejecutar este notebook, hace falta agregar llaves de una cuenta propia, así como el nombre de algún bucket relacionado a dicha cuenta.

**Nota al pie 2:** Lo mismo aplica para las variables definidas en la celda 12 para el envío de correos (SENDERNAME, RECIPIENT, COPY...). Además, tendría que darse de alta en AWS la dirección desde la que se envían los correos.

**Nota al pie 3:** En caso de no utilizarse Databricks, sería necesario descomentar una línea en la celda 10 de este notebook, la cuál crea una sesión de Spark. Además, deberían eliminarse todas las sentencias que hacen uso de la librería "dbutils".