# Tutorial: creación de ETLs incrementales con PySpark

## 1. Introducción	
	¿Qué aprenderá?
    En este tutorial aprenderá cómo puede usar PySpark para crear ETLs con historia en las dimensiones y con datos cargados previamente en la bodega de datos. 
	
	¿Qué construirá? 
	Construirá una nueva versión del ETL, a partir del ETL del taller anterior. Esta vez incluyendo el manejo de historia para algunas de las dimensiones del modelo multidimensional.
    
	¿Para qué?
	Dentro  de  procesos  de  ETL,  es común  que se  presenten  dimensiones que puedan  presentar cambios a través del tiempo para las cuales es necesario tener un plan de manejo de historia. Por lo tanto, es esencial saber cómo realizar este manejo en las distintas herramientas de ETLs y por supuesto cargas cuando la bodega de datos tiene datos cargados.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter Lab
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador J de MySQL
    5. Servidor SQL con base de datos "WWImporters" que contenga la dimension "Producto"
    6. Archivo con la información histórica de productos:
[ReporteNum2](./ReporteNum2_Productos.csv)

    7. Archivo con la información de fechas.

[Fecha](./Fecha.csv)

En  este tutorial,  utilizará PySpark para crear y ejecutar un ETL incremental con manejo de historia. Para completar esta actividad, es necesario que haya completado la actividad previa de creación de un ETL, pues se hará una extensión de dicho proceso. Se presenta a continuación el modelo multidimensional sin historia, cuyos datos conforman la base que se tiene actualmente 

![Modelo ordenes](./WWI_modelo_ordenes.png)

El objetivo de este tutorial es transformar el modelo anterior de forma que incorpore nuevos datos y en particular el manejo de historia de la dimensión Producto, para lo anterior se propone manterner la dimensión original y crear una nueva para el manejo de historia (<i>ProductoHistoria</i>), se espera que despues de este proceso el modelo multidimensional sea el siguiente

![Modelo ETL](./Img/ModeloHistoriaTutorial.png)

En el caso de la primera carga los valores por defecto para los atributos version, FechaInicial, FechaFinal y ID_Producto serán la fecha en la que se hace la carga, 2199-12-31 y el id de la dimension producto.

## 2. ETL Incremental para Productos
WideWorldImporters ha determinado que existe la posibilidad de que un <i>Producto</i> cambie su precio unitario. En vista de esto, se debe modificar el proceso ETL para manejar los cambios. Se ha decidido que el manejo que se le debe dar a la historia es **tipo 2**. Este tipo implica que la tabla tendrá 3 columnas nuevas: 1. fecha de inicio, 2. fin de vigencia del registro y 3. version, la última es un indicador de cuántos registros hay por cada producto. Un nuevo registro es ingresado cuando se genera un cambio en alguno de los atributos de los que se lleva historia. Esto lleva adicionalmente, a actualizar la columna que maneja la vigencia del Producto (fecha fin).

De acuerdo con esto, se debe añadir una serie de  transformaciones en el ETL al final para la dimensión de Producto, es decir, en el bloque 4 del diseño del ETL. Estas transformaciones incluyen las siguientes operaciones: 
<ol>
<li>Leer los datos ya existentes en la bodega o DataWarehouse (Base de datos Estudiante_i) y traer la última versión de cada <i>Producto</i>.</li>
<li>Identificar de los Productos reportados por en el negocio en un archivo csv (<i>ReporteNum2_Productos.csv</i>) con corte 2015-01-01, cuáles son nuevos e insertarlos en la bodega o DataWarehouse (Base de datos Estudiante_i) en la tabla ProductoHistoria.</li>
<li>Identificar los Productos que cambiaron de precio, para insertar un nuevo registro con el nuevo precio y modificar el registro existente, cambiando el valor del atributo <i>Fecha final</i>.</li>
</ol>

Este tutorial utiliza el ETL básico y se cambia el código del bloque 4.

In [1]:
# Configuración servidor base de datos transaccional
# Utilizar el usuario asignado en el curso
db_user = ''
db_psswd = ''

In [2]:
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_i_XXXXXX'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [3]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark.sql.functions import lit
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
import mysql.connector
from pyspark.sql.window import Window
from datetime import datetime

In [4]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [5]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1: Dimensión <i>Empleado</i>

#### Extracción

In [6]:
sql_empleados = '''(SELECT ID_persona AS ID_Empleado, NombreCompleto AS Nombre, EsVendedor FROM WWImportersTransactional.Personas WHERE EsVendedor=1) AS Temp_empleados'''
empleados = obtener_dataframe_de_bd(source_db_connection_string, sql_empleados, db_user, db_psswd)
empleados.show()

+-----------+------------------+----------+
|ID_Empleado|            Nombre|EsVendedor|
+-----------+------------------+----------+
|          2|    Kayla Woodcock|      true|
|          3|     Hudson Onslow|      true|
|          6|     Sophia Hinton|      true|
|          7|         Amy Trefl|      true|
|          8|    Anthony Grosse|      true|
|         13|Hudson Hollinworth|      true|
|         14|         Lily Code|      true|
|         15|         Taj Shand|      true|
|         16|     Archer Lamble|      true|
|         20|       Jack Potter|      true|
+-----------+------------------+----------+



#### Transformación

In [7]:
# TRANSFORMACION
empleados = empleados.selectExpr('ID_Empleado as ID_Empleado_T','Nombre')
empleados = empleados.withColumn('ID_Empleado_DWH', f.monotonically_increasing_id() + 1)
empleados.show(5)

+-------------+--------------+---------------+
|ID_Empleado_T|        Nombre|ID_Empleado_DWH|
+-------------+--------------+---------------+
|            2|Kayla Woodcock|              1|
|            3| Hudson Onslow|              2|
|            6| Sophia Hinton|              3|
|            7|     Amy Trefl|              4|
|            8|Anthony Grosse|              5|
+-------------+--------------+---------------+
only showing top 5 rows



#### Carga

In [8]:
# CARGUE
guardar_db(dest_db_connection_string, empleados,'Estudiante_i_XXXXXX.Empleado', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2: Dimensión ciudad. 

#### Extracción

In [9]:
#EXTRACCION
sql_paises = '''(SELECT ID_Pais, Nombre, Continente, Region, Subregion FROM WWImportersTransactional.Paises) AS Temp_paises'''
sql_provincias_estados = '''(SELECT ID_EstadosProvincias AS ID_EstadoProvincia, NombreEstadoProvincia, TerritorioVentas, ID_Pais FROM WWImportersTransactional.EstadosProvincias) AS Temp_estados_provincias'''
sql_ciudades = '''(SELECT ID_ciudad as ID_ciudad_T, NombreCiudad, ID_EstadoProvincia, Poblacion FROM WWImportersTransactional.Ciudades) AS Temp_ciudades'''

paises = obtener_dataframe_de_bd(source_db_connection_string, sql_paises, db_user, db_psswd)
provincias_estados = obtener_dataframe_de_bd(source_db_connection_string, sql_provincias_estados, db_user, db_psswd)
ciudades = obtener_dataframe_de_bd(source_db_connection_string, sql_ciudades, db_user, db_psswd)

print(ciudades.columns, paises.columns, provincias_estados.columns)

['ID_ciudad_T', 'NombreCiudad', 'ID_EstadoProvincia', 'Poblacion'] ['ID_Pais', 'Nombre', 'Continente', 'Region', 'Subregion'] ['ID_EstadoProvincia', 'NombreEstadoProvincia', 'TerritorioVentas', 'ID_Pais']


#### Transformación

In [10]:
# TRANSFORMACION
ciudades = ciudades.join(provincias_estados, how = 'inner', on = 'ID_EstadoProvincia')
ciudades = ciudades.join(paises, how = 'inner', on = 'ID_Pais')
ciudades = ciudades.withColumn('ID_Ciudad_DWH', f.monotonically_increasing_id() + 1)
ciudades.show(5)

+-------+------------------+-----------+------------+---------+---------------------+----------------+-------------+-------------+--------+----------------+-------------+
|ID_Pais|ID_EstadoProvincia|ID_ciudad_T|NombreCiudad|Poblacion|NombreEstadoProvincia|TerritorioVentas|       Nombre|   Continente|  Region|       Subregion|ID_Ciudad_DWH|
+-------+------------------+-----------+------------+---------+---------------------+----------------+-------------+-------------+--------+----------------+-------------+
|    230|                31|         49|     Absecon|     8411|           New Jersey|         Mideast|United States|North America|Americas|Northern America|            1|
|    230|                31|        150|    Adelphia|     null|           New Jersey|         Mideast|United States|North America|Americas|Northern America|            2|
|    230|                31|        336|      Albion|     null|           New Jersey|         Mideast|United States|North America|Americas|Northe

#### Carga

In [11]:
# CARGUE
guardar_db(dest_db_connection_string, ciudades,'Estudiante_i_XXXXXX.Ciudad', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 3: Dimensión paquete

#### Extracción

In [12]:
#EXTRACCION
sql_paquetes = '''(SELECT ID_TipoPaquete AS ID_TipoPaquete_T, TipoPaquete AS Nombre FROM WWImportersTransactional.Paquetes) AS Temp_Paquetes'''

paquetes = obtener_dataframe_de_bd(source_db_connection_string, sql_paquetes, db_user, db_psswd)

#### Transformación

In [13]:
paquetes = paquetes.withColumn('ID_TipoPaquete_DWH', f.monotonically_increasing_id() + 1)
paquetes.show(5)

+----------------+------+------------------+
|ID_TipoPaquete_T|Nombre|ID_TipoPaquete_DWH|
+----------------+------+------------------+
|               1|   Bag|                 1|
|               2| Block|                 2|
|               3|Bottle|                 3|
|               4|   Box|                 4|
|               5|   Can|                 5|
+----------------+------+------------------+
only showing top 5 rows



#### Carga

In [14]:
# CARGUE
guardar_db(dest_db_connection_string, paquetes,'Estudiante_i_XXXXXX.TipoPaquete', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 4: Dimensión producto
En términos generales para no afectar la carga realizada en el proceso ETL anterior, vamos a crear una copia de la dimensión Producto y la llamaremos ProductoHistoria. Es esta última en la que realizaremos la nueva carga de datos. En un contexto real lo que se hace es modificar la misma dimensión para manejar la historia y realizar el cargue incremental.

En detalle lo que vamos a hacer es:
1. Paso 1: Crear y cargar la dimensión ProductoHistoria a partir de los datos de la dimensión Producto agregando las columnas para el manejo de historia tipo 2.
2. Paso 2: Ingresar y actualizar los registros que lo requieran en la dimensión ProductoHistoria a partir de la información del  archivo ReporteNum2_Productos.csv. Este reporte lo entrega el negocio a corte 2015-01-01

Nota: El paso 1 incluye la creación de la tabla y la carga a partir de la información existente en la dimensión Producto. Este proceso si se hace en PySpark se puede realizar en un solo paso, al guardar un dataframe ya que este proceso incluye la creación de  las tablas en la base de datos.

Para las restricciones de llaves primarias le recomendamos usar MySQL Workbench, dando clic derecho sobre el nombre de la tabla, clic en <i>Alter table ...</i>, seleccionando el(los) atributo(s) que es(son) la llave primaria y dando clic en <i>Apply</i>

### Paso 1: Crear y cargar la Dimensión ProductoHistoria

#### Extracción

In [15]:
sql_productos = '''(SELECT * FROM WWImportersTransactional.Producto) AS Temp_productos'''
productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)

#### Transformación

In [16]:
productosHistoria = productos.withColumn('FechaInicial',lit('2013-01-01'))
productosHistoria = productosHistoria.withColumn('FechaFinal',lit('2199-12-31'))
productosHistoria = productosHistoria.withColumn('Version', lit(1))
productosHistoria = productosHistoria.withColumnRenamed("ID_Producto","ID_Producto_T")
productosHistoria = productosHistoria.withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productosHistoria.show(5)

+-------------+--------------------+-----+--------+----------------------+-------------------+--------+--------------+-----------------+------------+----------+-------+---------------+
|ID_Producto_T|      NombreProducto|Marca|ID_Color|Necesita_refrigeracion|Dias_tiempo_entrega|Impuesto|PrecioUnitario|PrecioRecomendado|FechaInicial|FechaFinal|Version|ID_Producto_DWH|
+-------------+--------------------+-----+--------+----------------------+-------------------+--------+--------------+-----------------+------------+----------+-------+---------------+
|            1|USB missile launc...| null|    null|                     0|                 14|      15|            25|               37|  2013-01-01|2199-12-31|      1|              1|
|            2|USB rocket launch...| null|      12|                     0|                 14|      15|            25|               37|  2013-01-01|2199-12-31|      1|              2|
|            3|Office cube peris...| null|       3|                     0| 

#### Carga inicial de la dimensión ProductoHistoria
El cargue inicial solo se realiza una vez, mientras que el incremental se realiza en cada reporte de negocio de datos en un periodo de tiempo

In [17]:
# CARGUE
guardar_db(dest_db_connection_string, productosHistoria,'Estudiante_i_XXXXXX.ProductoHistoria', db_user, db_psswd)

### Paso 2: Manejo de historia
Los reportes del negocio pueden ser varios a lo largo del tiempo, por ende el cargue incremental se hace con cada reporte que nos entrega el negocio. En cada reporte las versiones de los registros aumentan, por lo que podemos encontrar registros con versión 2, versión 3, versión 4 hasta versión N, siendo N el número de reporte. A continuación vamos a simular el primer reporte dado por el negocio despues del primer cargue, entonces todos los productos que existen en la tabla ProductoHistoria son la primera (y única) versión de los datos.

#### Extracción

In [18]:
productosReporte = obterner_dataframe_desde_csv('.\ReporteNum2_Productos.csv', ',')
sql_productosHistoria ='''(SELECT * FROM Estudiante_i_XXXXXX.ProductoHistoria) AS Temp_productos'''

productosHistoria = obtener_dataframe_de_bd(source_db_connection_string, sql_productosHistoria, db_user, db_psswd)
productosReporte.count(), productosHistoria.count()

(70, 227)

En la bodega de datos (base de datos Estudiante_i), se tienen varias versiones para un mismo Producto en un proceso incremental (N reportes). Sin embargo, para el manejo de historia tipo 2 sólo se necesita la última versión, se utiliza una función Window de PySpark para extraer únicamente la última versión por cada llave primaria de un Producto:

In [19]:
#Se utiliza un Window para extraer del dataframe de Productos, por cada ID_Producto, la versión mayor.
#Es decir, por cada llave natural, extrae la última versión.
window = Window.partitionBy(productosHistoria['ID_producto_T']).orderBy(productosHistoria['Version'].desc())
productosHistoria = productosHistoria.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') == 1) 
productosHistoria = productosHistoria.drop('rank')

#### Transformación
hacemos un LEFT JOIN para unir los productos del reporte y los productos de la tabla ProductoHistoria, de manera que se mantienen todos los registros del DataFrame con historia (La primera vez tiene la información cargada en el primer cargue y por ende todas las versiones tienen el valor de 1, FechaInicial 2013-01-01 y FechaFinal 2199-12-31).

In [20]:
#Renombra columnas del DataWarehouse, poniendo sujifo _DWH para evitar confusiones
productosHistoria = productosHistoria.selectExpr('ID_Producto_T','ID_Producto_DWH','Version as Version_DWH', 'PrecioUnitario as PrecioUnitario_DWH', 'FechaInicial as FechaInicial_DWH', 'FechaFinal as FechaFinal_DWH')

#Realiza left join entre productosReporte que se están procesando y productosHistoria en el DW.
productosHistoria = productosReporte.withColumnRenamed('ID_Producto', 'ID_Producto_T').join(productosHistoria, how = 'left', on ='ID_Producto_T')

Vamos a usar una nueva variable "ETIQUETA" que nos indica el tipo de registro para los productos del reporte:
- **-1** que representa "cambios": un registro que ya existe en ProductoHistoria y que cambio el valor de precio unitario
- **0** que representa "sin cambios": un registro que ya existe en ProductoHistoria y que no cambio su precio unitario
- **1** que representa "nuevo": un registro que no existia en ProductoHistoria

In [21]:
productosHistoria = productosHistoria.withColumn('ETIQUETA', 
    f.when(productosHistoria['ID_Producto_DWH'].isNull(), 1)\
    .when((productosHistoria['ID_Producto_DWH'].isNotNull()) 
          & ((productosHistoria['PrecioUnitario'] == productosHistoria['PrecioUnitario_DWH']) 
              | (productosHistoria['PrecioUnitario'].isNull() & productosHistoria['PrecioUnitario_DWH'].isNull())
          ), 0)\
    .otherwise(-1))

Los registros que no han cambiado (ETIQUETA = 0), no deben mantenerse en el DataFrame, por lo que se excluyen:

In [22]:
productosHistoria = productosHistoria.where(productosHistoria['ETIQUETA'] != 0)

Es necesario insertar nuevos registros al Data Warehouse y, por lo tanto, estos necesitan un nuevo id (ID_Producto_DWH). Para crear el id, primero se encuentra el id máximo de los productos existentes y, paso seguido, se utiliza el máximo junto con la función de PySpark monotonically_increasing_id para crear un nuevo id único para cada uno de los registros del reporte. Este paso también se puede realizar usando secuencias en MYSQL

In [23]:
#Encontar llave máxima
max_key = productosHistoria.agg({"ID_Producto_DWH": "max"}).collect()[0][0]
    
if max_key is None:
    max_key = 1
    
#Nuevos ids
productosHistoria = productosHistoria.withColumn('new_id', f.monotonically_increasing_id() + max_key)

Ahora, se procede a asignar/cambiar la versión, fecha de inicio y fecha de vencimiento o final a los registros de acuerdo con las categorías previamente determinadas. En este punto ETIQUETA solo puede tomar los valores de 1 y -1. Asignamos a una variable <i>fecha_reporte</i> la fecha del reporte, pues es la fecha en la que las versiones hasta ahora vigentes dejan de estarlo y entran en vigencia las nuevas versiones con los últimos cambios

In [24]:
fecha_reporte = '2015-01-01'

##### Caso de ETIQUETA=1: Productos nuevos

- **Nota 1**: Se asigna Version igual a 1, fecha inicial igual a la fecha de reporte (2015-01-01), fecha final igual a 2199-12-31

- **Nota 2**: Note que en la tercera línea de código (FechaFinal_DWH) si la ETIQUETA no es 1, entra al <i>otherwise</i>, en donde actualizamos la fecha final de vigencia de los registros que cambiaron (**caso ETIQUETA=-1**) como la fecha de reporte menos un día, es decir que dejan de ser vigentes hasta el día anterior al reporte. Note que los registros que cambiaron se van a agregar (**En Caso de ETIQUETA=-1: Productos que cambiaron**) como nuevos registros a la tabla con una versión posterior a la que existia, la fecha inicial como la del reporte y la final como 2199-12-31, es decir los productos que cambiaron se van a insertar como una nueva versión de los productos que ya existian.

- **Nota 3**: En la última línea de código estamos creando una segunda variable indicadora A_INSERTAR que toma el valor de 1 para registros que se van a INSERTAR directamente en la tabla ProductoHistoria y 0 para los que se van a actualizar (los productos que dejan de estar en vigencia). En el caso ETIQUETA=1, estamos diciendo que todos los productos nuevos se van a insertar directamente, mientras que, por ahora, los que cambiaron se van a actualizar

In [25]:
# Caso 1: Registros nuevos se asigna version igual a 1, fecha inicial la actual y fecha final 2199-12-31
# Note que aqui no cambiamos los valores en el otherwise dado que perderiamos la historia al sobreescribir la información con la de los registros que cambiaron
productosHistoria = productosHistoria.withColumn('Version_DWH', f.when(productosHistoria['ETIQUETA'] == 1, 1).otherwise(productosHistoria['Version_DWH']))
productosHistoria = productosHistoria.withColumn('FechaInicial_DWH', f.when(productosHistoria['ETIQUETA'] == 1, f.to_date(f.lit(fecha_reporte), 'yyyy-MM-dd')).otherwise(productosHistoria['FechaInicial_DWH']))
# En el otherwise estamos actualizando la fecha final del Caso -1: si son registros que ya existian y cambiaron, su fecha final será la fecha actual menos un dia
productosHistoria = productosHistoria.withColumn('FechaFinal_DWH', f.when(productosHistoria['ETIQUETA'] == 1, f.to_date(f.lit('2199-12-31'), 'yyyy-MM-dd')).otherwise(f.to_date(f.lit(fecha_reporte), 'yyyy-MM-dd')-1))
productosHistoria = productosHistoria.withColumn('A_INSERTAR', f.when(productosHistoria['ETIQUETA'] == 1, 1).otherwise(0))

##### Caso de ETIQUETA=-1: Productos que cambiaron

- **Nota 4**: Para los productos que cambiaron tenemos dos subcasos: el registro con el cambio que va a ser una nueva versión y que no existe en la base de datos y los registros de esos productos que cambiaron que si existian en la base de datos que dejan de estar vigentes, en cuyo caso hay que actualizar la FechaFinal

- **Nota 5**: Dada la nota 4, lo primero que hacemos es separar los registros que cambiaron y almacenarlos en un dataframe diferente, es decir aquellos con ETIQUETA=-1. Luego asignamos como versión nueva el valor de la versión anterior más 1; como fecha inicial la fecha de reporte de los datos, como fecha final 2199-12-31 

- **Nota 6**: En este caso la variable A_INSERTAR toma el valor de 1, es decir todos estos registros que son las nuevas versiones de productos que ya existian, se van a insertar directamente en la tabla ProductoHistoria


In [26]:
#caso -1: Los registros ya existian pero cambiaron: no se van a insertar, se van a actualizar
# Separamos los productos que cambiaron, creando una copia de los productos cuya ETIQUETA=-1
productos_nuevas_versiones = productosHistoria.where(productosHistoria['ETIQUETA'] == -1)
# Asignamos a estos registros una versión igual a la última+1, fecha inicial de vigencia del registro: la fecha actual, y como fecha final 2199-12-31
productos_nuevas_versiones = productos_nuevas_versiones.withColumn('Version_DWH', productos_nuevas_versiones['Version_DWH'] + 1)
productos_nuevas_versiones = productos_nuevas_versiones.withColumn('FechaInicial_DWH', f.to_date(f.lit(fecha_reporte), 'yyyy-MM-dd'))
productos_nuevas_versiones = productos_nuevas_versiones.withColumn('FechaFinal_DWH', f.to_date(f.lit('2199-12-31'), 'yyyy-MM-dd'))
productos_nuevas_versiones = productos_nuevas_versiones.withColumn('A_INSERTAR',f.lit(1))

Finalmente, unimos los dos dataframes, de manera que la variable A_INSERTAR toma dos valores: 1 o 0. 1 son los registros que se van a insertar directamente, estos son los productos nuevos y las versiones nuevas de los productos que ya existian pero cambiaron. 0 son los registros de las últimas versiones que habian en la base de datos de los productos que cambiaron cuya FechaFinal se va a actualizar pues dejan de estar en vigencia

In [27]:
# Unir los DataFrames con los registros nuevos, los registros de versiones anteriores y los registros que son nuevas versiones de registros existentes
productosHistoria = productosHistoria.union(productos_nuevas_versiones)

Ahora, para hacer la actualización y posteriormente la inserción se divide el DataFrame en dos, una parte conteniendo los registros a actualizarse y otra los registros a insertar:

In [28]:
# Persiste DF para obligar ejecución de grafo de computación. 
# Este persist es independiente de la base de datos, es más sobre pasar de memoria a disco el dataframe
productosHistoria.persist()
productosHistoria = productosHistoria.selectExpr('new_id as ID_Producto_DWH',  'NombreProducto', 'Necesita_refrigeracion', 'Dias_tiempo_entrega', 'Impuesto', \
                                 'PrecioUnitario', 'PrecioRecomendado','ID_Color', 'Marca', 'ID_PRODUCTO_T', 'Version_DWH as Version',\
                                 'FechaInicial_DWH as FechaInicial', 'FechaFinal_DWH as FechaFinal', 'A_INSERTAR')

#Separar en registros que se deben insertar y registros que se deben actualizar.
productos_inserts = productosHistoria.where(productosHistoria['A_INSERTAR'] == 1)
productos_updates = productosHistoria.where(productosHistoria['A_INSERTAR'] == 0)

productos_inserts = productos_inserts.drop('A_INSERTAR')
productos_updates = productos_updates.drop('A_INSERTAR')

#### Carga incremental de la dimensión ProductoHistoria
Primero hacemos la **actualización**, dado que hay menos registros en la tabla, la búsqueda y actualización es menor comparado a si lo hicieramos después de insertar los productos nuevos. La actualización es un poco más compleja que la inserción directa, puesto que Spark no lo soporta. Por lo tanto, el proceso se realiza en la base de datos, para lo cual se procede a insertar los registros en una tabla auxiliar (ProductoHistoria_Update), se ejecuta una sentencia SQL UPDATE para actualizar la tabla de la dimensión ProductoHistoria a partir de esta tabla auxiliar, cuyos registros son borrados al final con una sentencia TRUNCATE.

In [29]:
#Insertar en tabla auxiliar (ProductoHistoria_Update)
guardar_db(dest_db_connection_string, productos_updates,'Estudiante_i_XXXXXX.ProductoHistoria_Update', db_user, db_psswd)

In [30]:
#Ejecutar sentencia SQL UPDATE
import mysql.connector

mydb = mysql.connector.connect(
  host="157.253.236.116",
  port="8080",
  user="",
  passwd="",
  database="Estudiante_i_2022XX",
)
cur = mydb.cursor()

print(mydb)
res = cur.execute("""
    UPDATE ProductoHistoria 
    INNER JOIN ProductoHistoria_Update AS Table_Updates
        ON ProductoHistoria.ID_Producto_T = Table_Updates.ID_Producto_T
    SET ProductoHistoria.FechaFinal = Table_Updates.FechaFinal;
""")
res = cur.execute("""TRUNCATE TABLE ProductoHistoria_Update;""")
mydb.commit()
cur.close()
mydb.close()

<mysql.connector.connection_cext.CMySQLConnection object at 0x000002699D4A1C48>


Finalmente, hacemos la **inserción** de productos nuevos y de las nuevas versiones

In [31]:
guardar_db(dest_db_connection_string, productos_inserts,'Estudiante_i_XXXXXX.ProductoHistoria', db_user, db_psswd)

### BLOQUE 5: Dimensión cliente

#### Extracción

In [32]:
sql_categoriasCliente = '''(SELECT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT ID_Cliente AS ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

#### Transformación

In [33]:
# TRANSFORMACION
clientes = clientes.join(gruposCompra, how = 'inner', on = 'ID_GrupoCompra')
clientes = clientes.join(categoriasCliente, how = 'inner', on = 'ID_Categoria')
clientes = clientes.withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
clientes.show()

+------------+--------------+------------+--------------------+--------------+----------------+-------------+-------------------+--------+-----------------+---------------+--------------+
|ID_Categoria|ID_GrupoCompra|ID_Cliente_T|              Nombre|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|NombreGrupoCompra|NombreCategoria|ID_Cliente_DWH|
+------------+--------------+------------+--------------------+--------------+----------------+-------------+-------------------+--------+-----------------+---------------+--------------+
|           3|             2|         601|Wingtip Toys (Rut...|           401|           29887|         null|2013-01-01 00:00:00|       7|     Wingtip Toys|   Novelty Shop|             1|
|           3|             2|         600|Wingtip Toys (Car...|           401|            5407|         null|2013-01-01 00:00:00|       7|     Wingtip Toys|   Novelty Shop|             2|
|           3|             2|         599|Wingtip Toys (Dic.

#### Carga

In [34]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Estudiante_i_XXXXXX.Cliente', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 6: Hecho orden

#### Extracción

In [35]:
sql_ordenes_tmp = '''(SELECT * FROM WWImportersTransactional.ordenes) AS Temp_ordenes'''
sql_detallesOrdenes = '''(SELECT * FROM WWImportersTransactional.detallesordenes) AS Temp_detallesordenes'''
ordenes_tmp = obtener_dataframe_de_bd(source_db_connection_string, sql_ordenes_tmp, db_user, db_psswd)
detallesOrdenes = obtener_dataframe_de_bd(source_db_connection_string, sql_detallesOrdenes, db_user, db_psswd)

#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- La regla de negocio "La tasa de impuesto es de 10% o 15%" es correcta, pero habian errores en la tabla original, que fueron corregidos. 
- Para la segunda regla de negocio: "Son 73.595 órdenes detalladas en 231.412 lineas de detalle de órdenes realizadas desde 2013", si faltaban datos, los cuales fueron completados, y nos dicen que en cuanto a consistencia ellos revisaron las tablas e hicieron correcciones, pero que los duplicados completos de ordenes los eliminemos
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estan de acuerdo con que los estandarizemos y el formato sea el especificado en la regla
- Para las descripciones de productos que eran "a", se actualizaron a los valores reales. 
- Se pueden eliminar las columnas Comenarios, Instrucciones_de_entrega y comentarios_internos porque estan vacias. 
- A pesar de estar en un proceso de mejorar la calidad de los datos y mantener los nulos nos ayudaría a reflejar esa calidad, de la mano con el grupo de analitica de WWI se decide imputar por la media el valor extremo de la variable Cantidad
- Para las ordenes las columnas Seleccionado_por_ID_de_persona, ID_de_pedido_pendiente, Seleccion_completada_cuando, y para las columnas Seleccion_completada_cuando de la tabla detalles de ordenes, se decide mantener los valores vacíos, sin embargo para la variable Precio_unitario el negocio reviso y complemento los valores faltantes

Las tablas usadas en el tutorial de entendimiento de datos estaran disponibles para su revision con los siguientes nombres: OrdenesCopia y DetallesOrdenesCopia. 

Para este tutorial vamos a trabajar con unas tablas que dadas las conclusiones del tutorial de entendimiento, WWImporters revisó los datos originales, creo tablas y las llamo "Ordenes" y "DetallesOrdenes"

Se hace una verificación de los valores de la tasa de impuesto

In [36]:
detallesOrdenes.select("Tasa_de_impuesto").distinct().show()

+----------------+
|Tasa_de_impuesto|
+----------------+
|              10|
|              15|
+----------------+



Se hace una verificación del rango de fechas disponible en los datos

In [37]:
ordenes_tmp.agg({"Fecha_de_pedido": "min"}).show()

+--------------------+
|min(Fecha_de_pedido)|
+--------------------+
|          2013-01-01|
+--------------------+



Se elimina columnas Comenarios, Instrucciones_de_entrega y comentarios_internos

In [38]:
ordenes_tmp = ordenes_tmp.drop(*["Comentarios", "Instrucciones_de_entrega","comentarios_internos"])

Se eliminan duplicados totales de ordenes

In [39]:
print((ordenes_tmp.count(),ordenes_tmp.distinct().count()))

(107707, 93629)


In [40]:
ordenes_tmp = ordenes_tmp.drop_duplicates()

In [41]:
print((ordenes_tmp.count(),ordenes_tmp.distinct().count()))

(93629, 93629)


Se hace verificación de consistencia

In [42]:
#consistencia: revisar genially: definicion de consistencia
ids_ordenes = set([x.ID_de_pedido for x in ordenes_tmp.select('ID_de_pedido').collect()])
ids_detalles = set([x.ID_de_pedido for x in detallesOrdenes.select('ID_de_pedido').collect()])

len(ids_ordenes-ids_detalles), len(ids_detalles-ids_ordenes)

(0, 0)

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [43]:
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = ordenes_tmp.filter(ordenes_tmp["Fecha_de_pedido"].rlike(regex))
noCumplenFormato = ordenes_tmp.filter(~ordenes_tmp["Fecha_de_pedido"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))

20034 73595
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|    Jan 28,2014|               201

In [44]:
# TRANSFORMACION
# En este punto tomamos los registros que tiene los formatos de fecha Feb 20,2014 y lo transformamos al formato estandar dado por WWImporters.
noCumplenFormato = noCumplenFormato.withColumn('Fecha_de_pedido', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_de_pedido')))
ordenes_tmp = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes_tmp.count()

(20034, 93629)

In [45]:
# Verifricar que las fechas de no cumplen formato ya quedaron en el formato estandar.
noCumplenFormato.show(5)

+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|     2014-01-28|               2014-01-29|    

Descripciones

In [46]:
detallesOrdenes.where(length(col("Descripcion")) <= 10).show()

+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+



Imputar valor maximo de cantidad

In [47]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[1]

Row(Cantidad=360)

In [48]:
detallesOrdenes = detallesOrdenes.replace( 10000000, 360, 'Cantidad')

In [49]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[0]

Row(Cantidad=360)

Se unen los dos dataframes, se verifica que no haya duplicados y si los hay se eliminan

In [50]:
ordenes_tmp = ordenes_tmp.join(detallesOrdenes, how = 'inner', on = 'ID_de_pedido')
ordenes_tmp = ordenes_tmp.withColumn('Valor_total',col('Precio_unitario')*col('Cantidad'))
ordenes_tmp = ordenes_tmp.withColumn('Impuestos',col('Valor_total')*col('Tasa_de_impuesto'))
ordenes_tmp = ordenes_tmp.selectExpr('ID_de_pedido as ID_de_pedido_T','ID_Producto','Fecha_de_pedido','ID_de_cliente','ID_de_vendedor','ID_Tipo_Paquete','Cantidad','Valor_total', 'Impuestos')

print((ordenes_tmp.count(),ordenes_tmp.distinct().count()))

ordenes_tmp = ordenes_tmp.drop_duplicates()
ordenes_tmp.show(5)

(294314, 231412)
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+
|ID_de_pedido_T|ID_Producto|Fecha_de_pedido|ID_de_cliente|ID_de_vendedor|ID_Tipo_Paquete|Cantidad|Valor_total|Impuestos|
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+
|           148|        203|     2013-01-02|          812|            13|              7|      40|       1280|    19200|
|           463|         64|     2013-01-09|          555|             3|              7|       1|         30|      450|
|           463|         10|     2013-01-09|          555|             3|              7|       8|        256|     3840|
|           463|         16|     2013-01-09|          555|             3|              7|      10|        130|     1950|
|           463|         57|     2013-01-09|          555|             3|              7|       3|         39|      585|
+--------------

#### Carga la tabla temporal relacionada con el hecho

In [51]:
guardar_db(dest_db_connection_string, ordenes_tmp,'Estudiante_i_XXXXXX.Hecho_Orden_Tmp', db_user, db_psswd)

En este punto la única dimensión que falta por crear es la fecha. Aquí se crea utilizando como base un archvio .csv compartido por el grupo consultor de WWImporters, y se sube al date warehouse.

In [52]:
fecha = obterner_dataframe_desde_csv('.\Fecha.csv', ',')
guardar_db(dest_db_connection_string, fecha,'Estudiante_i_XXXXXX.Fecha', db_user, db_psswd)

In [53]:
sql_producto_historia = '''(SELECT * FROM Estudiante_i_XXXXXX.ProductoHistoria) AS Tempo_producto_historia'''
producto_historia = obtener_dataframe_de_bd(source_db_connection_string, sql_producto_historia, db_user, db_psswd)
producto_historia.count()

261

En este punto con toda la información transformada se va acrear la tabla de hechos.

In [54]:
# El idPedido representa la dimensión degenerada Pedido
ordenes = ordenes_tmp.alias('o').join(clientes.alias('cl'), ordenes_tmp.ID_de_cliente == clientes.ID_Cliente_T,'left_outer')\
                    .join(ciudades.alias('ciu'), clientes.ID_CiudadEntrega == ciudades.ID_ciudad_T,'left_outer') \
                    .join(empleados.alias('e'), ordenes_tmp.ID_de_vendedor == empleados.ID_Empleado_T,'left_outer') \
                    .join(paquetes.alias('p'), ordenes_tmp.ID_Tipo_Paquete == paquetes.ID_TipoPaquete_T,'left_outer') \
                    .join(producto_historia.alias('ph'), (ordenes_tmp.ID_Producto == producto_historia.ID_Producto_T) & (ordenes_tmp.Fecha_de_pedido.between(producto_historia.FechaInicial,producto_historia.FechaFinal)),'left_outer') \
                    .join(fecha.alias('f'), ordenes_tmp.Fecha_de_pedido == fecha.Fecha,'left_outer') \
                    .select([col('o.ID_de_pedido_T'),col('cl.ID_Cliente_DWH'),col('ciu.ID_Ciudad_DWH'),
                             col('e.ID_Empleado_DWH'),col('ph.ID_Producto_DWH'),col('p.ID_TipoPaquete_DWH'),
                             col('f.ID_Fecha'),col('o.Cantidad'),col('o.Valor_total'),col('o.Impuestos')]) \
                    .fillna({'ID_Cliente_DWH': 0, 'ID_Ciudad_DWH': 0, 'ID_Empleado_DWH': 0, 'ID_Producto_DWH': 0,
                             'ID_TipoPaquete_DWH': 0})
ordenes.show(5)

+--------------+--------------+-------------+---------------+---------------+------------------+--------+--------+-----------+---------+
|ID_de_pedido_T|ID_Cliente_DWH|ID_Ciudad_DWH|ID_Empleado_DWH|ID_Producto_DWH|ID_TipoPaquete_DWH|ID_Fecha|Cantidad|Valor_total|Impuestos|
+--------------+--------------+-------------+---------------+---------------+------------------+--------+--------+-----------+---------+
|           463|            47|        29160|              2|             64|                 7|20130109|       1|         30|      450|
|           463|            47|        29160|              2|             10|                 7|20130109|       8|        256|     3840|
|           463|            47|        29160|              2|             16|                 7|20130109|      10|        130|     1950|
|           463|            47|        29160|              2|             57|                 7|20130109|       3|         39|      585|
|           148|             0|          

In [55]:
ordenes.count()

231412

In [56]:
# Esta instrucción puede tomar un par de minutos.
# Guarda la tabla de hechos
guardar_db(dest_db_connection_string, ordenes,'Estudiante_i_XXXXXX.Hecho_Orden', db_user, db_psswd)

In [57]:
# En caso de tener problema de memoria al momento de cargar una de las tablas, puede intentar un cargue por partes utilizando una idea como la que se presenta a continuación.
# Revise con detalle el manejo de las variables por si requiere ajuste
# inferior = 0
# superior = 99999
# j=0
# total = ordenes.count()/100000
# print(total)
# collected = ordenes.collect()
#while j<total:
    #if j%50==0:
    #    print(j)
    #j += 1
#    aux = spark.createDataFrame(collected[inferior:superior],ordenes.columns)
#    guardar_db(dest_db_connection_string, aux,'Estudiante_i_2022XX.Hecho_Orden_tmp', db_user, db_psswd)
#    inferior+=100000
#    superior+=100000

Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

## 4. Cierre
Completado este tutorial, sabe cómo configurar y realizar ETLs con historia en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

Para saber más sobre las técnicas de manejo de historia, consulte el libro <i>The  Data Warehouse Toolkit</i> de Ralph Kimball y Margy Ross,que podrá encontrar en la biblioteca de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.