# Tutorial: creación de ETLs con PySpark

## 1. Introducción	
	¿Qué aprenderá?
    En este tutorial aprenderá cómo puede usar PySpark para crear ETLs con historia en las dimensiones. 
	
	¿Qué construirá? 
	Construirá sobre el ETL del taller anterior, esta vez incluyendo el manejo de historia para algunas de las dimensiones del modelo multidimensional.
    
	¿Para qué?
	Dentro  de  procesos  de  ETL,  es común  que se  presenten  dimensiones que puedan  presentar cambios a través del tiempo para las cuales es necesario tener un plan de manejo de historia. Por lo tanto, es esencial saber cómo realizar este manejo en las distintas herramientas de ETLs.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter notebook
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador J de MySQL
    5. Servidor SQL con base de datos multidimensional "WWImportersDWH" que contenga la dimension "stockItem_Historia"

En  este tutorial,  utilizará PySpark para crear y ejecutar un ETL con historia. Para completar este taller, es necesario que haya completado el taller anterior, creación de un ETL, pues se construirá sobre el mismo. Se presenta a continuación el modelo multidimensional sin historia, cuyos datos conforman la base que se tiene actualmente 

![Modelo ordenes](./WWI_modelo_ordenes.png)

El objetivo de este tutorial es transformar el modelo anterior de forma que incorpore manejo de historia de la dimensión Producto, para lo anterior se propone manterner la dimensión original como una fuente para la primera carga de datos (momento en el cual deja de ser útil) y crear una nueva para el manejo de historia (<i>ProductoHistoria</i>), se espera que despues de este proceso el modelo multidimensional sea el siguiente

![ETL](./ModeloHistoria.PNG)

En el caso de la primera carga los valores por defecto para los atributos version, FechaInicial, FechaFinal y ID_Producto serán la fecha en la que se hace la carga, 2199-12-31 y el id de la dimension producto.

## 2. ETL Incremental para Productos
WideWorldImporters ha determinado que existe la posibilidad de que un <i>Producto</i> cambie su precio unitario. En vista de esto, se debe modificar el proceso ETL para manejar los cambios. Se ha decidido que el manejo que se le debe dar a la historia es **tipo 2**. Este tipo implica que la tabla tendrá 3 columnas nuevas 1. fecha de inicio y 2. fin de vigencia del registro y 3. version, la última es un indicador de cuántos registros hay por cada producto. Un nuevo registro es ingresado cuando se genera un cambio en alguno de los atributos, así como es necesario actualizar las 3 columnas que manejan la vigencia del Producto.

De acuerdo con esto, se debe añadir una serie de  transformaciones en el ETL al final para la dimensión de Producto, es decir, en el bloque 4 del diseño del ETL, estas transformaciones incluyen las siguientes operaciones: 
<ol>
<li>Leer los datos ya existentes en la bodega o DataWarehouse (Base de datos multidimensional Estudiante_43) y traer la última versión de cada <i>Producto</i>.</li>
<li>Identificar de los Productos registrados en un archivo csv (<i>ProductosActualizados.csv</i>), cuáles son nuevos e insertarlos en la bodega o DataWarehouse (Base de datos multidimensional Estudiante_43) en la tabla ProductoHistoria.</li>
<li>Para los Productos que cambiaron de precio, se debe insertar un nuevo registro con el nuevo precio, modificar el registro existente, cambiando el valor del atributo <i>Fecha final</i> y los valores de vigencia a S o N para indicar que el registro existente se ha vencido.</li>
</ol>

Primero, se cargan los datos de la base de datos transaccional esto se realiza con el fin de incorporar los nuevos datos que pudieron ser ingresados desde la última carga a la bodega multidimensional. También, se deben realizar las transformaciones regulares de la dimensión <i>Producto</i>. Para esto se mantienen los bloques diseñados en el anterior tutorial y se incorpora nuevo código en el bloque 6

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar su usuario uniandes sin caracteres especiales como usuario y su codigo de estudiante como constraseña
db_user = ''
db_psswd = ''
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_43'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark.sql.functions import lit
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
import mysql.connector

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1: Dimensión <i>Empleado</i>

#### Extracción

In [5]:
sql_empleados = '''(SELECT ID_persona AS ID_Empleado, NombreCompleto AS Nombre, EsVendedor FROM WWImportersTransactional.Personas WHERE EsVendedor=1) AS Temp_empleados'''
empleados = obtener_dataframe_de_bd(source_db_connection_string, sql_empleados, db_user, db_psswd)
empleados.show()

+-----------+------------------+----------+
|ID_Empleado|            Nombre|EsVendedor|
+-----------+------------------+----------+
|          2|    Kayla Woodcock|      true|
|          3|     Hudson Onslow|      true|
|          6|     Sophia Hinton|      true|
|          7|         Amy Trefl|      true|
|          8|    Anthony Grosse|      true|
|         13|Hudson Hollinworth|      true|
|         14|         Lily Code|      true|
|         15|         Taj Shand|      true|
|         16|     Archer Lamble|      true|
|         20|       Jack Potter|      true|
+-----------+------------------+----------+



#### Transformación

In [7]:
# TRANSFORMACION
empleados = empleados.selectExpr('ID_Empleado as ID_Empleado_T','Nombre')
empleados = empleados.withColumn('ID_Empleado_DWH', f.monotonically_increasing_id() + 1)
empleados.show(5)

+-----------+------------------+
|ID_Empleado|            Nombre|
+-----------+------------------+
|          2|    Kayla Woodcock|
|          3|     Hudson Onslow|
|          6|     Sophia Hinton|
|          7|         Amy Trefl|
|          8|    Anthony Grosse|
|         13|Hudson Hollinworth|
|         14|         Lily Code|
|         15|         Taj Shand|
|         16|     Archer Lamble|
|         20|       Jack Potter|
+-----------+------------------+



#### Carga

In [13]:
# CARGUE
guardar_db(dest_db_connection_string, empleados,'Estudiante_43.Empleado', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2: Dimensión ciudad. 

#### Extracción

In [14]:
#EXTRACCION
sql_paises = '''(SELECT ID_Pais, Nombre, Continente, Region, Subregion FROM WWImportersTransactional.Paises) AS Temp_paises'''
sql_provincias_estados = '''(SELECT ID_EstadosProvincias AS ID_EstadoProvincia, NombreEstadoProvincia, TerritorioVentas, ID_Pais FROM WWImportersTransactional.EstadosProvincias) AS Temp_estados_provincias'''
sql_ciudades = '''(SELECT ID_ciudad as ID_ciudad_T, NombreCiudad, ID_EstadoProvincia, Poblacion FROM WWImportersTransactional.Ciudades) AS Temp_ciudades'''

paises = obtener_dataframe_de_bd(source_db_connection_string, sql_paises, db_user, db_psswd)
provincias_estados = obtener_dataframe_de_bd(source_db_connection_string, sql_provincias_estados, db_user, db_psswd)
ciudades = obtener_dataframe_de_bd(source_db_connection_string, sql_ciudades, db_user, db_psswd)

print(ciudades.columns, paises.columns, provincias_estados.columns)

['ID_ciudad', 'NombreCiudad', 'ID_EstadoProvincia', 'Poblacion'] ['ID_Pais', 'Nombre', 'Continente', 'Region', 'Subregion'] ['ID_EstadoProvincia', 'NombreEstadoProvincia', 'TerritorioVentas', 'ID_Pais']


#### Transformación

In [15]:
# TRANSFORMACION
ciudades = ciudades.join(provincias_estados, how = 'inner', on = 'ID_EstadoProvincia')
ciudades = ciudades.join(paises, how = 'inner', on = 'ID_Pais')
ciudades = ciudades.withColumn('ID_Ciudad_DWH', f.monotonically_increasing_id() + 1)
ciudades.show(5)

+-------+------------------+---------+------------------+---------+---------------------+----------------+-------------+-------------+--------+----------------+
|ID_Pais|ID_EstadoProvincia|ID_ciudad|      NombreCiudad|Poblacion|NombreEstadoProvincia|TerritorioVentas|       Nombre|   Continente|  Region|       Subregion|
+-------+------------------+---------+------------------+---------+---------------------+----------------+-------------+-------------+--------+----------------+
|    230|                31|       49|           Absecon|     8411|           New Jersey|         Mideast|United States|North America|Americas|Northern America|
|    230|                31|      150|          Adelphia|     null|           New Jersey|         Mideast|United States|North America|Americas|Northern America|
|    230|                31|      336|            Albion|     null|           New Jersey|         Mideast|United States|North America|Americas|Northern America|
|    230|                31|      

#### Carga

In [None]:
# CARGUE
guardar_db(dest_db_connection_string, ciudades,'Estudiante_43.Ciudad', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 3: Dimensión paquete

#### Extracción

In [18]:
#EXTRACCION
sql_paquetes = '''(SELECT ID_TipoPaquete AS ID_TipoPaquete_T, TipoPaquete AS Nombre FROM WWImportersTransactional.Paquetes) AS Temp_Paquetes'''

paquetes = obtener_dataframe_de_bd(source_db_connection_string, sql_paquetes, db_user, db_psswd)

#### Transformación

In [None]:
paquetes = paquetes.withColumn('ID_TipoPaquete_DWH', f.monotonically_increasing_id() + 1)
paquetes.show(5)

#### Carga

In [20]:
# CARGUE
guardar_db(dest_db_connection_string, paquetes,'Estudiante_43.TipoPaquete', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 4: Dimensión producto
En términos generales vamos a usar como primer carga de la nueva dimensión ProductoHistoria los datos de la dimensión Producto, simulando una fuente de datos. En un contexto real lo que se hace es modificar la misma dimensión para manejar la historia.

En detalle lo que vamos a hacer es:
1. Paso 1: Actualizar la información de la dimensión Producto
2. Paso 2: Crear la dimensión ProductoHistoria y hacer el primer cargue en ProductoHistoria con los datos de Producto
3. Paso 3: Guardar actualizaciones de Productos que vienen del archivo ReporteNum2_Productos.csv en ProductoHistoria según tipo 2

Nota: El paso 2 sin usar pyspark serían dos pasos, sin embargo vamos a aprovechar que al guardar un dataframe se crean las tablas en la base de datos 

### Paso 1: Actualizar Dimensión Producto

#### Extracción

In [6]:
sql_productos = '''(SELECT ID_Producto AS ID_Producto_T, ID_Color, NombreProducto, Marca, Necesita_refrigeracion, Dias_tiempo_entrega, Impuesto, PrecioUnitario, PrecioRecomendado FROM WWImportersTransactional.Producto) AS Temp_productos'''
sql_colores = '''(SELECT ID_Color, Color FROM WWImportersTransactional.Colores) AS Temp_colores'''

productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
colores = obtener_dataframe_de_bd(source_db_connection_string, sql_colores, db_user, db_psswd)

#### Transformación

In [7]:
# TRANSFORMACION
productos = productos.join(colores, how = 'inner', on = 'ID_Color')
productos = productos.withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productos.show(5)

+--------+-----------+--------------------+-----+----------------------+-------------------+--------+--------------+-----------------+-----+
|ID_Color|ID_Producto|      NombreProducto|Marca|Necesita_refrigeracion|Dias_tiempo_entrega|Impuesto|PrecioUnitario|PrecioRecomendado|Color|
+--------+-----------+--------------------+-----+----------------------+-------------------+--------+--------------+-----------------+-----+
|       3|          3|Office cube peris...| null|                     0|                 14|      15|            19|               28|Black|
|       3|         17|DBA joke mug - mi...| null|                     0|                 12|      15|            13|               19|Black|
|       3|         19|DBA joke mug - da...| null|                     0|                 12|      15|            13|               19|Black|
|       3|         21|DBA joke mug - yo...| null|                     0|                 12|      15|            13|               19|Black|
|       3|   

#### Carga

In [None]:
# CARGUE
guardar_db(dest_db_connection_string, productos,'Estudiante_43.Producto', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### Paso 2: Crear y cargar la Dimensión ProductoHistoria

#### Transaformación

In [8]:
productos = productos.withColumn('FechaInicial',lit('2013-01-02'))
productos = productos.withColumn('FechaFinal',lit('2199-12-31'))
productos = productos.withColumn('Version', lit(1))
productos.show(5)

+--------+-----------+--------------------+-----+----------------------+-------------------+--------+--------------+-----------------+-----+------------+----------+-------+
|ID_Color|ID_Producto|      NombreProducto|Marca|Necesita_refrigeracion|Dias_tiempo_entrega|Impuesto|PrecioUnitario|PrecioRecomendado|Color|FechaInicial|FechaFinal|Vigente|
+--------+-----------+--------------------+-----+----------------------+-------------------+--------+--------------+-----------------+-----+------------+----------+-------+
|       3|          3|Office cube peris...| null|                     0|                 14|      15|            19|               28|Black|  2013-01-02|2199-12-31|      S|
|       3|         17|DBA joke mug - mi...| null|                     0|                 12|      15|            13|               19|Black|  2013-01-02|2199-12-31|      S|
|       3|         19|DBA joke mug - da...| null|                     0|                 12|      15|            13|               19|B

#### Carga (inicial)

In [9]:
# CARGUE
guardar_db(dest_db_connection_string, productos,'Estudiante_43.ProductoHistoria', db_user, db_psswd)

### Paso 3: Actualizaciones de productos

In [12]:
# Borrar
# Primero: sacamos productos a parte
import pandas as pd
import random
df = pd.read_csv('.\CSV modificados\CopiaProductosTransaccional.csv')
min_PU, max_PU = df['PrecioUnitario'].min(), df['PrecioUnitario'].max() 
nuevos.....
#nuevos = df.sample(frac = 0.23)
#df = df[~df.index.isin(nuevos.index)]

cambios = df.sample(frac = 0.23)
no_cambios = df[~df.index.isin(cambios.index)]
no_cambios = no_cambios.sample(frac = 0.36)
cambios['PrecioUnitario'] = cambios['PrecioUnitario'].apply(lambda x : random.randint(min_PU, max_PU))

print(cambios.shape, no_cambios.shape)
df = pd.concat([cambios, no_cambios])
df.to_csv('.\CSV modificados\ReporteNum2_Productos.csv')

(29, 10)

#### Extracción

In [None]:
productosReporte = obterner_dataframe_desde_csv('.\ReporteNum2_Productos.csv', ',')
sql_productos = '''(SELECT * FROM Estudiante_43.ProductoHistoria) AS Temp_productos'''

productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
productosReporte.count()

En la bodega de datos (base de datos multidimensional), se tienen varias versiones para un mismo Producto. Sin embargo, para el manejo de historia tipo 2 sólo se necesita la última versión, se utiliza una función Window de PySpark para extraer únicamente la última versión por cada llave natural de un Producto:

In [None]:
#Se utiliza un Window para extraer del dataframe de Productos, por cada ID_Producto, la versión mayor.
#Es decir, por cada llave natural, extrae la última versión.
window = Window.partitionBy(productos['ID_producto_T']).orderBy(productos['Version'].desc())
productos = productos.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') == 1) 
productos = productos.drop('rank')

#### Transformación
hacemos un LEFT JOIN para unir los productos de la tabla ProductoHistoria y los productos del reporte, de manera que se mantienen todos los registros del DataFrame con historia (La primera vez tiene la información cargada en el primer cargue y por ende todas las versiones tienen el valor de 1, FechaInicial 2013-01-02 y FechaFinal 2199-12-31).

TODO revisar lo de las llaves

In [None]:
#Renombra columnas del DataWarehouse, poniendo sujifo _DWH para evitar confusiones
productos = productos.selectExpr('Version as Version_DWH', 'PrecioUnitario as PrecioUnitario_DWH', 'FechaInicio as FechaInicio_DWH', 'FechaFinal as FechaFinal_DWH')

#Realiza left join entre stockitems que se están procesando y stockitems en el DW.
productos = productos.join(productosReporte.withColumnRenamed('ID_Producto', 'ID_Producto_T'), how = 'left', on ='ID_Producto_T')

Vamos a usar una nueva variable "ETIQUETA" que nos indica el tipo de registro para los productos del reporte:
- **-1** que representa "cambios": un registro que ya existe en ProductoHistoria y que cambio el valor de precio unitario
- **0** que representa "sin cambios": un registro que ya existe en ProductoHistoria y que no cambio su precio unitario
- **1** que representa "nuevo": un registro que no existia en ProductoHistoria

In [None]:
productos = productos.withColumn('ETIQUETA', 
    f.when(productos['ID_Producto_DWH'].isNull(), 1)\
    .when((productos['ID_Producto_DWH'].isNotNull()) 
          & ((productos['PrecioUnitario'] == productos['PrecioUnitario_DWH']) 
              | (productos['PrecioUnitario'].isNull() & productos['PrecioUnitario_DWH'].isNull())
          ), 0)\
    .otherwise(-1))

Los registros que no han cambiado (ETIQUETA = 0), no deben mantenerse en el DataFrame, por lo que se excluyen:

In [None]:
productos = productos.where(df['ETIQUETA'] != 0)

Es necesario insertar nuevos registros al Data Warehouse y, por lo tanto, estos necesitan un nuevo id. Para crear el id, primero se encuentra el id máximo y, paso seguido, se utiliza el máximo junto con la función de PySpark monotonically_increasing_id para crear un nuevo id único para cada uno de los registros.

In [None]:
#Encontar llave máxima
max_key = productos.agg({"ID_Producto_DWH": "max"}).collect()[0][0]
    
if max_key is None:
    max_key = 1
    
#Nuevos ids
productos = productos.withColumn('new_id', f.monotonically_increasing_id() + max_key)

Ahora, se procede a asignar la versión, fecha de inicio y fecha de vencimiento o final a los registros de acuerdo con las categorías previamente determinadas. Note que los registros que cambiaron deben duplicarse, esto se debe a que una copia del registro se debe usar para actualizar FechaInicial en el Data Warehouse y la otra se utiliza para insertar la última versión del registro.

In [None]:
#caso 1: Los registros son nuevos (i.e, los registros transaccionales nuevos)
productos = productos.withColumn('Version_DWH', f.when(productos['ETIQUETA'] == 1, 1).otherwise(productos['Version_DWH']))
productos = productos.withColumn('FechaInicio_DWH', f.when(productos['ETIQUETA'] == 1, f.current_date()).otherwise(productos['FechaInicio_DWH']))
productos = productos.withColumn('FechaFinal_DWH', f.when(productos['ETIQUETA'] == 1, f.to_date(f.lit('2199-12-31'), 'yyyy-MM-dd')).otherwise(f.current_date()))
productos = productos.withColumn('aInsertar', f.when(productos['ETIQUETA'] == 1, 1).otherwise(0))
    
#caso -1: Los registros ya existian pero tenian cambios
# 1.1 Es necesario editar existentes: poner FechaFinal_DWH como fecha actual. Después, actualizar en la base de datos (NO INSERTAR)
# 1.2 Es necesario crear una nueva fila, identica a anterior con los siguientes ajustes: version = version + 1, FechaInicio_DWH = hoy, FechaFinal_DWH = 2199-12-31
productos_dup = productos.where(df['ETIQUETA'] == 1)
#1.2
productos_dup = productos_dup.withColumn('Version_DWH', productos_dup['Version_DWH'] + 1)
productos_dup = productos_dup.withColumn('FechaInicio_DWH', f.current_date())
productos_dup = productos_dup.withColumn('FechaFinal_DWH', f.to_date(f.lit('2199-12-31'), 'yyyy-MM-dd'))
productos_dup = productos_dup.withColumn('aInsertar',f.lit(1))
    
    
# Unir los DataFrames con los registros nuevos (caso -1) y los duplicados (caso 1)
productos = productos.union(productos_dup)
    
#Eliminar la columna 'ETIQUETA' que no se persiste en la base de datos
productos = productos.drop('ETIQUETA')

Finalmente, se divide el DataFrame en dos, una parte conteniendo los registros a actualizarse y otra los registros a insertar:

In [None]:
#Persiste DF para obligar ejecución de grafo de computación. TODO: Revisar docs
productos.persist()
productos = productos.selectExpr('new_id as ID_Producto_DWH',  'NombreProducto', 'Necesita_refrigeracion', 'Dias_tiempo_entrega', 'Impuesto', \
                                 'PrecioUnitario', 'PrecioRecomendado','ID_Color', 'Color', 'Marca', 'ID_PRODUCTO_T', 'Version_DWH as Version',\
                                 'FechaInicio_DWH as FechaInicio', 'FechaFinal_DWH as FechaFinal')
#Separar en registros que se deben insertar y registros que se deben actualizar.
productos_inserts = productos.where(df['aInsertar'] == 1)
productos_updates = productos.where(df['aInsertar'] == 0)

#### Carga (Incremental)

INSERTAMOS REGISTROS: TODO...cambiar por funcion

In [None]:
guardar_db(dest_db_connection_string, productos_inserts,'Estudiante_43.ProductoHistoria', db_user, db_psswd)

La actualización de registros, es un poco más compleja, puesto que Spark no lo soporta. Por lo tanto, el proceso se realiza en la base de datos para lo cual se procede a insertar los registros en una tabla auxiliar (Stockitem_Historia_Update), que contiene las columnas ID_Producto_T y FechaFinal y después, se ejecuta una sentencia SQL update para actualizar la tabla de la dimensión Producto a partir de esta tabla auxiliar, cuyos registros son borrados al final.

In [None]:
#Insertar en tabla auxiliar (ProductoHistoria_Update)
guardar_db(dest_db_connection_string, productos_updates,'Estudiante_43.ProductoHistoria_Update', db_user, db_psswd)

In [None]:
#Ejecutar sentencia SQL UPDATE
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="",
  database="WWImportersDWH",
)
cur = mydb.cursor()

print(mydb)
res = cur.execute("""
UPDATE stockItem_Historia 
INNER JOIN Stockitem_Historia_Update AS Table_B 
ON stockItem_Historia.Stock_Item_Key=Table_B.Stock_Item_Key
SET stockItem_Historia.Date_to=Table_B.date_to;
""")
res = cur.execute("""TRUNCATE TABLE Stockitem_Historia_Update;""")
mydb.commit()
cur.close()
mydb.close()

### BLOQUE 5: Dimensión cliente

#### Extracción

In [24]:
sql_categoriasCliente = '''(SELECT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT ID_Cliente AS ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

#### Transformación

In [25]:
# TRANSFORMACION
clientes = clientes.join(gruposCompra, how = 'inner', on = 'ID_GrupoCompra')
clientes = clientes.join(categoriasCliente, how = 'inner', on = 'ID_Categoria')
clientes = clientes.withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
clientes.show()

+------------+--------------+----------+--------------------+--------------+----------------+-------------+-------------------+--------+-----------------+---------------+
|ID_Categoria|ID_GrupoCompra|ID_Cliente|              Nombre|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|NombreGrupoCompra|NombreCategoria|
+------------+--------------+----------+--------------------+--------------+----------------+-------------+-------------------+--------+-----------------+---------------+
|           3|             2|       601|Wingtip Toys (Rut...|           401|           29887|         null|2013-01-01 00:00:00|       7|     Wingtip Toys|   Novelty Shop|
|           3|             2|       600|Wingtip Toys (Car...|           401|            5407|         null|2013-01-01 00:00:00|       7|     Wingtip Toys|   Novelty Shop|
|           3|             2|       599|Wingtip Toys (Dic...|           401|            9077|         null|2013-01-01 00:00:00|       7|     Wing

#### Carga

In [26]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Estudiante_43.Cliente', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 6: Hecho orden

#### Extracción

In [None]:
sql_ordenes = '''(SELECT * FROM WWImportersTransactional.Ordenes) AS Temp_ordenes'''
sql_detallesOrdenes = '''(SELECT * FROM WWImportersTransactional.DetallesOrdenes) AS Temp_detallesordenes'''
ordenes = obtener_dataframe_de_bd(source_db_connection_string, sql_ordenes, db_user, db_psswd)
detallesOrdenes = obtener_dataframe_de_bd(source_db_connection_string, sql_detallesOrdenes, db_user, db_psswd)

#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- Diferencia entre cantidad y cantidad seleccionada es X
- La regla de negocio "La tasa de impuesto es de 10% o 15%" es correcta, pero habian errores en la tabla original, que fueron corregidos. 
- Para la segunda regla de negocio: "Son 73.595 órdenes detalladas en 231.412 lineas de detalle de órdenes realizadas desde 2013", si faltaban datos, los cuales fueron completados, y nos dicen que en cuanto a consistencia ellos revisaron las tablas e hicieron correcciones, pero que los duplicados completos de ordenes los eliminemos
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estan de acuerdo con que los estandarizemos y el formato sea el especificado en la regla
- Para las descripciones de productos que eran "a", se actualizaron a los valores reales. 
- Se pueden eliminar las columnas Comenarios, Instrucciones_de_entrega y comentarios_internos porque estan vacias. 
- A pesar de estar en un proceso de mejorar la calidad de los datos y mantener los nulos nos ayudaría a reflejar esa calidad, de la mano con el grupo de analitica de WWI se decide imputar por la media el valor extremo de la variable Cantidad
- Para las ordenes las columnas Seleccionado_por_ID_de_persona, ID_de_pedido_pendiente, Seleccion_completada_cuando, y para las columnas Seleccion_completada_cuando de la tabla detalles de ordenes, se decide mantener los valores vacíos, sin embargo para la variable Precio_unitario el negocio reviso y complemento los valores faltantes

Las tablas usadas en el tutorial de entendimiento de datos estaran disponibles para su revision con los siguientes nombres: OrdenesCopia y DetallesOrdenesCopia. 

Para este tutorial vamos a trabajar con unas tablas que dadas las conclusiones del tutorial de entendimiento, WWImporters revisó los datos originales, creo tablas y las llamo "Ordenes" y "DetallesOrdenes"

Se hace una verificación de los valores de la tasa de impuesto

In [36]:
detallesOrdenes.select("Tasa_de_impuesto").distinct().show()

+----------------+
|Tasa_de_impuesto|
+----------------+
|              10|
|              15|
+----------------+



Se hace una verificación del rango de fechas disponible en los datos

In [37]:
ordenes.agg({"Fecha_de_pedido": "min"}).show()

+--------------------+
|min(Fecha_de_pedido)|
+--------------------+
|          2013-01-01|
+--------------------+



Se elimina columnas Comenarios, Instrucciones_de_entrega y comentarios_internos

In [38]:
ordenes = ordenes.drop(*["Comentarios", "Instrucciones_de_entrega","comentarios_internos"])

Se eliminan duplicados totales de ordenes

In [46]:
print((ordenes.count(),ordenes.distinct().count()))

(107707, 93629)


In [47]:
ordenes = ordenes.drop_duplicates()

In [48]:
print((ordenes.count(),ordenes.distinct().count()))

(93629, 93629)


Se hace verificación de consistencia

In [49]:
#consistencia: revisar genially: definicion de consistencia
ids_ordenes = set([x.ID_de_pedido for x in ordenes.select('ID_de_pedido').collect()])
ids_detalles = set([x.ID_de_pedido for x in detallesOrdenes.select('ID_de_pedido').collect()])

len(ids_ordenes-ids_detalles), len(ids_detalles-ids_ordenes)

(0, 0)

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [60]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = ordenes.filter(ordenes["Fecha_de_pedido"].rlike(regex))
noCumplenFormato = ordenes.filter(~ordenes["Fecha_de_pedido"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('Fecha_de_pedido', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_de_pedido')))
ordenes = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()

93629


(93629, 93629)

Descripciones


In [58]:
detallesOrdenes.where(length(col("Descripcion")) <= 10).show()

+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+



Imputar valor maximo de cantidad

In [71]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[1]

Row(Cantidad=360)

In [80]:
detallesOrdenes = detallesOrdenes.replace( 10000000, 360, 'Cantidad')

In [81]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[0]

Row(Cantidad=360)

Se unen los dos dataframes, se verifica que no haya duplicados y si los hay se eliminan

In [None]:
ordenes = ordenes.join(detallesOrdenes, how = 'inner', on = 'ID_de_pedido')
ordenes = ordenes.withColumn('Valor_total',col('Precio_unitario')*col('Cantidad'))
ordenes = ordenes.withColumn('Impuestos',col('Valor_total')*col('Tasa_de_impuesto'))
ordenes = ordenes.selectExpr('ID_de_pedido as ID_de_pedido_T','ID_Producto','Fecha_de_pedido','ID_de_cliente','ID_de_vendedor','ID_Tipo_Paquete','Cantidad','Valor_total', 'Impuestos')

print((ordenes.count(),ordenes.distinct().count()))

ordenes = ordenes.drop_duplicates()
ordenes = ordenes.withColumnRename('ID_de_pedido_DWH', f.monotonically_increasing_id() + 1)
ordenes.show(5)

#### Carga

In [82]:
# CARGUE
inferior = 0
superior = 999
j=0
total = ordenes.count()/1000
print(total)
collected = ordenes.collect()
while j<total:
    if j%50==0:
        print(j)
    j += 1
    aux = spark.createDataFrame(collected[inferior:superior],ordenes.columns)
    guardar_db(dest_db_connection_string, aux,'Estudiante_43.HechoOrden', db_user, db_psswd)
    inferior+=1000
    superior+=1000

Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

## 4. Cierre
Completado este tutorial, sabe cómo configurar y realizar ETLs con historia en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

Para saber más sobre las técnicas de manejo de historia, consulte el libro <i>The  Data Warehouse Toolkit</i> de Ralph Kimball y Margy Ross,que podrá encontrar en la biblioteca de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.