# ETL básico con `pygrametl`

### Introducción

La siguiente es una descripción básica para implementar flujos ETL (_Extract, Transform and Load_) en Python, utilizando la librería [`pygrametl`](https://chrthomsen.github.io/pygrametl/doc/index.html).

__NOTA__: Se requiere de la instalación de las siguientes tecnologías:

- Python v3.9.18
- [MySQL Connector/Python](https://dev.mysql.com/doc/connector-python/en/) v8.0.18
- Libreria [pygrametl](http://chrthomsen.github.io/pygrametl/) v2.8 

### Contexto

El siguiente caso consiste en un ejemplo simulado de un almacén de datos.

<img src="./img/erd-ventas_dm.png" width="700px">

- El almacén de datos tiene una tabla de hechos y tres dimensiones organizadas en un __esquema estrella__. 
- La __tabla de hechos__ almacena datos de la cantidad de libros que se venden cada día. 
- La __dimensión libro__ almacena el nombre y género de cada libro vendido.
- La __dimensión localizacion__ almacena la comuna y región de los clientes.
- La __dimensión tiempo__ almacena la fecha de cada venta. 

Para efectos de baja complejidad, ninguna de las dimensiones tiene forma de copo de nieve, las dimensiones tampoco contienen atributos que cambien lentamente. Sin embargo, la librería `pygrametl` es compatible con dichas características, si este fuera el caso ([más información](https://chrthomsen.github.io/pygrametl/doc/api/pygrametl.html)).

Los datos utilizados provienen desde __dos fuentes__: 
- La __base de datos libreria__ (sistema OLTP) que contiene los registros de ventas de libros.
- Un __archivo CSV__ que contiene información de las comunas por regiones de Chile (obtenido desde [https://www.subdere.gov.cl/](https://www.subdere.gov.cl/documentacion/regiones-provincias-y-comunas-de-chile-2011)). Esta información es usada para complementar la dimensión __localizacion__ del almacen de datos.

### Extracción desde archivos

Para la extracción de filas de un archivo CSV solo se requiere de un controlador de archivo abierto.
- `pygrametl` usa `DictReader` de Python para archivos CSV.
- Asume que el encabezado del archivo CSV contiene el nombre de cada columna.
- Al usar `CSVSource`, es importante convertir los valores al tipo correcto antes de insertarlos en una tabla de la base de datos.


In [None]:
from pygrametl.datasources import CSVSource

region_file_handle = open('./data/cut_2010_v02.csv', 'r', encoding="utf-8")
region_source = CSVSource(f=region_file_handle, delimiter=',')

### Extracción desde RDBMS

Para extraer datos desde un RDBMS, es necesario crear conexiones a la base de datos. Esta conexión deben ser conexiones [PEP 249](https://www.python.org/dev/peps/pep-0249/).
- [`MySQL connector`](https://dev.mysql.com/doc/connector-python/en/) es una API que cumple con especificación Python Database API (PEP 249).

In [None]:
from mysql.connector import connect, Error, errorcode

def conexion(host, dbname, user, pwd):
    """Retorna un objeto de tipo conexión.
    """
    try:
        conn = connect(user=user, password=pwd, host=host, database=dbname)
        print('Conexión a {} establecida con exito!'.format(dbname))
        return conn
    except Error as e:
        if e.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print('Algo anda mal con tu usuario o contraseña.')
        elif e.errno == errorcode.ER_BAD_DB_ERROR:
            print('La base de datos no existe.')
        else:
            print(e)
    return None

- La mayoría de las abstracciones de la librería `pigrametl` __producen, consumen y operan datos en forma de _registro___.
- Considere que un _registro_ es un diccionario cuyos nombres de las columnas son las claves, y los valores son los datos que contiene cada _registro_.

A continuación se presenta un ejemplo de extracción de datos desde la base de datos __libreria__.

In [None]:
from pygrametl.datasources import SQLSource

cnx_libreria = conexion('localhost', 'libreria', 'root', 'mysqlroot')
query = """
    SELECT date(pedido.fecha) as 'fecha',
        libro.titulo AS 'libro',
        genero.nombre AS 'genero',
        comuna.nombre AS 'comuna',
        sum(pedido.cantidad) AS 'cantidad'
    FROM pedido
    INNER JOIN libro ON libro.id_libro = pedido.libro
    INNER JOIN genero ON genero.id_genero = libro.genero
    INNER JOIN cliente ON cliente.id_cliente = pedido.cliente
    INNER JOIN comuna ON comuna.id_comuna = cliente.comuna
    GROUP BY fecha, libro, genero, comuna;
    """

name_mapping = 'fecha', 'titulo', 'genero', 'comuna', 'cantidad'
ventas_source = SQLSource(connection=cnx_libreria, query=query, names=name_mapping)

### Transformación

A partir de la fecha obtenida desde la base de datos __libreria__ se obtienen nuevos atributos para poblar la dimensión __tiempo__. Para esto, se diseñan dos funciones, una para dividir la fecha y obtener los atributos __dia__, __mes__ y __año__, y la otra, para obtener el __trimestre__ al que corresponde dicha fecha.

In [None]:
def set_dmY(row):
    """Agrega los componentes de la fecha al registro (row)

    Args:
        row (dict): Registros que se carga en el almacen
    """

    fecha = row['fecha']
    row['dia'] = fecha.day
    row['mes'] = fecha.month
    row['anio'] = fecha.year

def set_trimestre(row):
    """Agrega el trimestre al registro (row) 

    Args:
        row (dict): Registros que se carga en el almacén
    """

    if row['mes'] < 4:
        trimestre = 1
    elif row['mes'] < 7:
        trimestre = 2
    elif row['mes'] < 10:
        trimestre = 3
    else:
        trimestre = 4

    row['trimestre'] = trimestre

### Conexion con el almacén de datos

- `ConnectionWrapper` comparte automáticamente entre abstracciones de `pygrametl`, se guarda en una variable para que la conexión se pueda cerrar.

In [None]:
from pygrametl import ConnectionWrapper

# notar que se utiliza la función conexion() creada anteriormente
cnx_dm = conexion('localhost', 'ventas_dm', 'root', 'mysqlroot')
cnx_dm_wrapper = ConnectionWrapper(connection=cnx_dm)

### Carga

- `pygrametl` proporciona diversos tipos de abstracciones para dimensiones y tablas de hechos. En este ejemplo, se usan las más simples.
- `CachedDimension` crea una instancia de para cada dimensión en el almacén de datos. 
- `CachedDimension` utiliza caché local para reducir significativamente la cantidad de solicitudes emitidas al RDBMS. 

Para cada dimensión, __se proporciona el nombre de la tabla de la base de datos, la clave principal de la tabla y las columnas sin clave (atributos) de la tabla__. 

Además, para la __dimensión de localicalizacion__, se proporciona el subconjunto de los atributos que se deben usar para buscar la clave principal.

In [None]:
from pygrametl.tables import CachedDimension, FactTable

libro_dim = CachedDimension(
        name='libro',
        key='id_libro',
        attributes=['titulo', 'genero'])

tiempo_dim = CachedDimension(
        name='tiempo',
        key='id_tiempo',
        attributes=['fecha', 'dia', 'mes', 'anio', 'trimestre'])

localizacion_dim = CachedDimension(
        name='localizacion',
        key='id_localizacion',
        attributes=['comuna', 'region'],
        lookupatts=['comuna'])

A continuación se crea una instancia de `FactTable` para la tabla de hechos del almacén de datos, a partir, del nombre de la tabla, una lista de columnas que constituyen la clave principal de la tabla de hechos y una lista de las medidas.

In [None]:
ventas_ft = FactTable(
        name='ventas',
        keyrefs=['id_libro', 'id_localizacion', 'id_tiempo'],
        measures=['cantidad'])

La dimensión __localizacion__ se completa con datos obtenidos desde el archivo CSV, ya que el archivo contiene toda la información necesaria para ambas columnas de la tabla. Para insertar las filas se usa el método `CachedDimension.insert()`

__NOTA__: Si la dimensión __localizacion__ se completara solo con los datos de la base de datos __libreria__, sería necesario actualizar el atributo de región con datos del archivo CSV cada vez que se actualicen los datos del almacen de datos.

In [None]:
for row in region_source:
    localizacion_dim.insert({'comuna': row['Nombre Comuna'], 'region': row['Nombre Región']})

region_file_handle.close()

A continuación se cargan los datos en las dimensiones __libro__, __tiempo__, y también en la __tabla de hechos__.

In [None]:
for row in ventas_source:
    # Se divide la fecha
    set_dmY(row)
    
    # Se obtiene el trimestre
    set_trimestre(row)

    # La fila se actualiza con las claves primarias correctas para 
    # cada dimensión y cualquier dato nuevo se inserta en cada una 
    # de las dimensiones al mismo tiempo.
    row['id_libro'] = libro_dim.ensure(row)
    row['id_tiempo'] = tiempo_dim.ensure(row)

    # CachedDimension.ensure() no se utiliza para la dimensión localizacion 
    # porque ya se ha rellenado. En su lugar, se utiliza el método 
    # CachedDimension.lookup() que no inserta ningún dato y devuelve el 
    # valor None si no está disponible una fila con las búsquedas correctas. 
    # En este caso, se genera un error si falta una ubicación en el archivo 
    # CSV, ya que la recuperación no es posible.
    row['id_localizacion'] = localizacion_dim.lookup(row)
    if not row['id_localizacion']:
        raise ValueError("La comuna no se encuentra en la dimension Localizacion")

    # Considerando que la cantidad de ordénes se presenta como un valor agregado 
    # en los registros de ventas, la fila se puede insertar en el almacén de datos. 
    # De lo contrario, se debería realizar la transformación antes de insertar.
    ventas_ft.insert(row)

# Después de que se hayan insertado todos los datos, se ordena a la conexión que se 
# confirme la inserción y luego se cierra. Esto garantiza que los datos se
# confirmen en la base de datos y que los recursos utilizados por la conexión se
# liberen.
cnx_dm_wrapper.commit()
cnx_dm_wrapper.close()

# Finalmente, se cierra la conexión a la base de datos libreria
cnx_libreria.close()