# Tarea 2 - ETL

## 1. Enunciado	
Enunciado
Ahora que sabe cómo realizar un proceso ETL, dado el modelo multidimensional del proceso de negocio de movimientos de inventario realice las siguientes actividades:

Entregable 1 - Diseño del ETL: diseñe el ETL para las dimensiones proveedor, tipoTransaccion y para la tabla de hechos. A nivel de la tabla de proveedores incluya la tabla categoriasProveedores donde encuentra información de las categorías. El diseño del ETL es un diagrama como lo encuentra en la infografía de proceso ETL.

Entregable 2 - Implementación del ETL: implementación del proceso ETL para las dimensiones Proveedor, TipoTransaccion y para la tabla de hechos


Note que para este proceso de negocio, las dimensiones Producto, Cliente y Fecha son iguales a las del hecho Orden, este caso se conoce como dimensiones compartidas. Usted debe concentrarse en las dimensiones Proveedor, TipoTransaccion y la tabla de hechos que no existen en la bodega de datos actualmente.

## 2. Modelo dimensional

En este proceso de ETL, se extraen los datos de los **movimientos** de una base de datos transaccional y se almacenan en otra base de datos que corresponde a la bodega de datos, siguiendo una aproximación ROLAP. A continuación, se presenta el modelo multidimensional que es el modelo conceptual que representa el proceso de registro de movimientos. Este modelo se utilizó para crear las tablas en la bodega de datos que representan el proceso de negocio y que serán cargadas como resultado del proceso ETL.

![Modelo ordenes](./Modelo_movimiento.PNG)

## 2. Diseño del ETL

![Modelo ordenes](./Diseno-ETL-T2.JPEG)

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_37'
db_psswd = 'QV8UJT16LT'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_37'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>TipoTransaccion</i>, su fuente de datos viene de la tabla transaccional <i>TiposTransaccion</i>.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. Note que aquí se pueden renombrar los atributos con la estructura <i>nombreActual AS nuevoNombre</i>. De la tabla de personas, En este paso, solo nos interesan los empleados, por lo cual se hace un filtro por medio del WHERE, buscando las personas cuyo atributo EsVendedor sea igual a 1.

In [5]:
sql_tipo_transaccion = '''(SELECT TipoTransaccionID AS ID_Tipo_transaccion, TipoTransaccionNombre as Tipo FROM WWImportersTransactional.TiposTransaccion) AS Temp_tipo_transaccion'''
tipos_transaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_tipo_transaccion, db_user, db_psswd)
tipos_transaccion.show(5)

+-------------------+--------------------+
|ID_Tipo_transaccion|                Tipo|
+-------------------+--------------------+
|                  2|Customer Credit Note|
|                  3|Customer Payment ...|
|                  4|     Customer Refund|
|                  5|    Supplier Invoice|
|                  6|Supplier Credit Note|
+-------------------+--------------------+
only showing top 5 rows



#### Transformación
En este caso no se hará ninguna transaformación.

In [6]:
# TRANSFORMACION
tipos_transaccion = tipos_transaccion.selectExpr('ID_Tipo_transaccion','Tipo')
tipos_transaccion.show(5)

+-------------------+--------------------+
|ID_Tipo_transaccion|                Tipo|
+-------------------+--------------------+
|                  2|Customer Credit Note|
|                  3|Customer Payment ...|
|                  4|     Customer Refund|
|                  5|    Supplier Invoice|
|                  6|Supplier Credit Note|
+-------------------+--------------------+
only showing top 5 rows



#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

In [13]:
# CARGUE
guardar_db(dest_db_connection_string, tipos_transaccion,'Estudiante_37.TipoTransaccion', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2
Empezamos el bloque 2: dimensión Proveedor. Su fuente de datos es una combinación de las tablas transaccionales <i>CategoriasProveedores, Proveedores y  Personas</i>

#### Extracción

In [25]:
#EXTRACCION
sql_categorias_proveedores = '''(SELECT CategoriaProveedorID, CategoriaProveedor as Categoria FROM WWImportersTransactional.CategoriasProveedores) AS Temp_CategoriasProveedores'''
sql_proveedores = '''(SELECT ProveedorID as ID_Proveedor, NombreProveedor as Nombre, PersonaContactoPrincipalID as Contacto_Principal, DiasPago as Dias_pago, CodigoPostal as Codigo_postal, CategoriaProveedorID FROM WWImportersTransactional.Proveedores) AS Temp_Proveedores'''
categorias_proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_categorias_proveedores, db_user, db_psswd)
proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)

print(categorias_proveedores.columns, proveedores.columns, proveedores.count())

['CategoriaProveedorID', 'Categoria'] ['ID_Proveedor', 'Nombre', 'Contacto_Principal', 'Dias_pago', 'Codigo_postal', 'CategoriaProveedorID'] 13


#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- Transf 1. Los valores de  día de pago negativos se deben multiplicar por -1.
- Transf 2. Unificar Proveedores, los nombres debe ser únicos. Tener en cuenta que hay nombres con el mismo nombre más un "Inc" o "Ltd".
- Transf 3. Validar código postal no es el mismo para todos los proveedores
- Transf 4. Generar códigos propios de la bodega de datos.

Se hace la correción a los valores de Dias_pago negativos

In [26]:
# TRANSFORMACION
proveedoresDiaPagoNegativo = proveedores.filter(proveedores["Dias_pago"] < 0)
proveedoresDiaPagoPositivo = proveedores.filter(~(proveedores["Dias_pago"] < 0))
print(proveedoresDiaPagoNegativo.count(), proveedoresDiaPagoPositivo.count())
print(proveedoresDiaPagoNegativo.show(5))
proveedoresDiaPagoNegativo = proveedoresDiaPagoNegativo.withColumn('Dias_pago', f.udf(lambda dp: dp * -1, t.IntegerType())(f.col('Dias_pago')))
proveedores = proveedoresDiaPagoNegativo.union(proveedoresDiaPagoPositivo)
proveedoresDiaPagoNegativo.count(), proveedores.count()
proveedores.show()

6 7
+------------+-------------------+------------------+---------+-------------+--------------------+
|ID_Proveedor|             Nombre|Contacto_Principal|Dias_pago|Codigo_postal|CategoriaProveedorID|
+------------+-------------------+------------------+---------+-------------+--------------------+
|           2|      Contoso, Ltd.|                23|       -7|        98253|                   2|
|           6|Humongous Insurance|                31|      -14|        37770|                   9|
|           8| Lucerne Publishing|                35|      -30|        37659|                   2|
|           1|A Datum Corporation|                21|      -14|        46077|                   2|
|          11|      Trey Research|                41|       -7|        57543|                   8|
+------------+-------------------+------------------+---------+-------------+--------------------+
only showing top 5 rows

None
+------------+--------------------+------------------+---------+-----------

Se hace la unificación de nombres

In [27]:
# TRANSFORMACION
import re
# Se probo la transformación con proveedores copia.
# sql_proveedores_copia = '''(SELECT ProveedorID as ID_Proveedor, NombreProveedor as Nombre, PersonaContactoPrincipalID as Contacto_Principal, DiasPago as Dias_pago, CodigoPostal as Codigo_postal, CategoriaProveedorID FROM WWImportersTransactional.provedoresCopia) AS Temp_Proveedores_copia'''
# proveedores_copia = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores_copia, db_user, db_psswd)

regex = "([a-zA-Z]+\s(Inc. Ltd|Inc.|Inc|Ltd.|Ltd)+)"
proveedoresNombreConSufijo = proveedores.filter(proveedores["Nombre"].rlike(regex))
proveedoresNombreSinSufijo = proveedores.filter(~(proveedores["Nombre"].rlike(regex)))
print(proveedoresNombreConSufijo.count(), proveedoresNombreSinSufijo.count())
print(proveedoresNombreConSufijo.show(5))

proveedoresNombreConSufijo = proveedoresNombreConSufijo.withColumn('Nombre', f.udf(lambda n: n.replace(re.search(regex, n).groups()[1], "").strip(), t.StringType())(f.col('Nombre')))
proveedores = proveedoresNombreConSufijo.union(proveedoresNombreSinSufijo)
dict_of_unique_ids = {row[0]: row[1] for row in proveedores.groupBy("Nombre").min("ID_Proveedor").collect() }
print(dict_of_unique_ids)
if dict_of_unique_ids:
    proveedores = proveedores.withColumn('ID_Proveedor', f.udf(lambda n: dict_of_unique_ids[n], t.IntegerType())(f.col('Nombre')))

print(proveedores.count(), proveedores.distinct().count())
proveedores = proveedores.drop_duplicates()
proveedores.show()

0 13
+------------+------+------------------+---------+-------------+--------------------+
|ID_Proveedor|Nombre|Contacto_Principal|Dias_pago|Codigo_postal|CategoriaProveedorID|
+------------+------+------------------+---------+-------------+--------------------+
+------------+------+------------------+---------+-------------+--------------------+

None
{'Contoso, Ltd.': 2, 'Trey Research': 11, 'Humongous Insurance': 6, 'A Datum Corporation': 1, 'Lucerne Publishing': 8, 'Consolidated Messenger': 3, 'The Phone Company': 12, 'Woodgrove Bank': 13, 'Nod Publishers': 9, 'Graphic Design Institute': 5, 'Litware, Inc.': 7, 'Northwind Electric Cars': 10, 'Fabrikam, Inc.': 4}
13 13
+------------+--------------------+------------------+---------+-------------+--------------------+
|ID_Proveedor|              Nombre|Contacto_Principal|Dias_pago|Codigo_postal|CategoriaProveedorID|
+------------+--------------------+------------------+---------+-------------+--------------------+
|           3|Consol

Validar código postal no es el mismo para todos los proveedores, si bien hay dos proveedores con el mismo código postal es una situación normal porque más de un proveedor podrían pertenecer a la misma ciudad.

In [13]:
proveedores.count(), proveedores.select("Codigo_postal").distinct().count()

(13, 12)

In [28]:
# TRANSFORMACION
proveedores = proveedores.join(categorias_proveedores, how = 'inner', on = 'CategoriaProveedorID')
proveedores.show()

+--------------------+------------+--------------------+------------------+---------+-------------+--------------------+
|CategoriaProveedorID|ID_Proveedor|              Nombre|Contacto_Principal|Dias_pago|Codigo_postal|           Categoria|
+--------------------+------------+--------------------+------------------+---------+-------------+--------------------+
|                   6|           3|Consolidated Mess...|                25|       30|        94101|servicios de mens...|
|                   3|          10|Northwind Electri...|                39|       30|         7860|            juguetes|
|                   5|           7|       Litware, Inc.|                33|       30|        95245|            embalaje|
|                   9|           6| Humongous Insurance|                31|       14|        37770|servicios de seguros|
|                   4|           4|      Fabrikam, Inc.|                27|       30|        40351|                ropa|
|                   8|          

In [29]:
proveedores = proveedores.selectExpr('ID_Proveedor', 'Nombre', 'Contacto_Principal', 'Dias_pago', 'Categoria', 'Codigo_postal')
proveedores.show()

+------------+--------------------+------------------+---------+--------------------+-------------+
|ID_Proveedor|              Nombre|Contacto_Principal|Dias_pago|           Categoria|Codigo_postal|
+------------+--------------------+------------------+---------+--------------------+-------------+
|           3|Consolidated Mess...|                25|       30|servicios de mens...|        94101|
|          10|Northwind Electri...|                39|       30|            juguetes|         7860|
|           7|       Litware, Inc.|                33|       30|            embalaje|        95245|
|           6| Humongous Insurance|                31|       14|servicios de seguros|        37770|
|           4|      Fabrikam, Inc.|                27|       30|                ropa|        40351|
|          11|       Trey Research|                41|        7|servicios de mark...|        57543|
|          13|      Woodgrove Bank|                45|        7|servicios financi...|        94101|


#### Carga

In [31]:
# CARGUE
guardar_db(dest_db_connection_string, proveedores,'Estudiante_37.Proveedor', db_user, db_psswd)

### BLOQUE 3
Bloque 6: Hecho movimiento. Su fuente de datos es la tabla movimientos

#### Extracción

In [73]:
sql_movimientos = '''(SELECT
TransactionOccurredWhen as Fecha_Movimiento,
StockItemID as ID_Producto,
SupplierID as ID_Proveedor,
CustomerID as ID_Cliente,
TransactionTypeID as ID_Tipo_Transaccion,
Quantity as Cantidad FROM WWImportersTransactional.movimientos) AS Temp_movimientos'''
movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)

#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- Transf 1. Eliminar los registros duplicados.
- Transf 2. Verificar que las cantidades negativas  equivalen a salidas de productos del inventario (Sería según el tipo de transacción).
- Transf 3. Verificar que existan movimientos con una antiguedad menor a 2014
- Transf 4. Transformar las fechas según esta regla: YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD


Se hace una verificación de los valores de la tasa de impuesto

In [74]:
movimientos.count(), movimientos.distinct().count()

(267300, 236656)

In [75]:
# se eliminan los duplicados
movimientos = movimientos.drop_duplicates()
print(movimientos.count(), movimientos.distinct().count())
movimientos.show(5)

236656 236656
+----------------+-----------+------------+----------+-------------------+--------+
|Fecha_Movimiento|ID_Producto|ID_Proveedor|ID_Cliente|ID_Tipo_Transaccion|Cantidad|
+----------------+-----------+------------+----------+-------------------+--------+
|     Jan 03,2015|         39|            |     512.0|                 10|   -10.0|
|     Aug 03,2015|         45|            |     862.0|                 10|   -10.0|
|     May 12,2015|          9|            |     478.0|                 10|   -10.0|
|     May 07,2014|        215|            |      14.0|                 10|   -10.0|
|     Dec 28,2015|         71|            |     870.0|                 10|   -10.0|
+----------------+-----------+------------+----------+-------------------+--------+
only showing top 5 rows



Verificar que las cantidades negativas  equivalen a salidas de productos del inventario y corregir de ser necesario.

In [76]:
# transaccion de salida ID = 10
movimientosConCantidadNegativa = movimientos.filter(movimientos["Cantidad"] < 0)
movimientosDeSalida = movimientos.filter(movimientos["ID_Tipo_Transaccion"] == 10)
movimientosConCantidadNegativa.count(), movimientosDeSalida.count()

(228275, 228254)

Se obtienen los registros que deben ser corregidos

In [77]:
movimientosPorCorregir = movimientos.filter(movimientos["ID_Tipo_Transaccion"] != 10).filter(movimientos["Cantidad"] < 0)
restoDeMovimientos = movimientos.filter(~(movimientos["Cantidad"] < 0))
print(movimientos.count(), restoDeMovimientos.count(), movimientosPorCorregir.count())
movimientosPorCorregir.show(5)

236656 8381 21
+--------------------+-----------+------------+----------+-------------------+--------+
|    Fecha_Movimiento|ID_Producto|ID_Proveedor|ID_Cliente|ID_Tipo_Transaccion|Cantidad|
+--------------------+-----------+------------+----------+-------------------+--------+
|         Jan 31,2015|        205|            |       0.0|                 12|    -5.0|
|2013-07-31 14:00:...|        158|            |       0.0|                 12|    -4.0|
|2016-04-30 14:00:...|         82|            |       0.0|                 12|    -4.0|
|2014-07-31 14:00:...|        121|            |       0.0|                 12|    -4.0|
|         Oct 31,2015|         24|            |       0.0|                 12|    -5.0|
+--------------------+-----------+------------+----------+-------------------+--------+
only showing top 5 rows



Corregir movimientos de salida

In [78]:
movimientosPorCorregir = movimientosPorCorregir.withColumn('ID_Tipo_Transaccion', f.udf(lambda n: 10, t.IntegerType())(f.col('ID_Tipo_Transaccion')))
movimientosPorCorregir.show()
movimientosDeSalida = movimientosDeSalida.union(movimientosPorCorregir)
movimientos = movimientosDeSalida.union(restoDeMovimientos)
movimientos.count(), movimientosDeSalida.count(), restoDeMovimientos.count()

+--------------------+-----------+------------+----------+-------------------+--------+
|    Fecha_Movimiento|ID_Producto|ID_Proveedor|ID_Cliente|ID_Tipo_Transaccion|Cantidad|
+--------------------+-----------+------------+----------+-------------------+--------+
|         Jan 31,2015|        205|            |       0.0|                 10|    -5.0|
|2013-07-31 14:00:...|        158|            |       0.0|                 10|    -4.0|
|2016-04-30 14:00:...|         82|            |       0.0|                 10|    -4.0|
|2014-07-31 14:00:...|        121|            |       0.0|                 10|    -4.0|
|         Oct 31,2015|         24|            |       0.0|                 10|    -5.0|
|2015-01-31 14:00:...|        144|            |       0.0|                 10|    -4.0|
|2015-04-30 14:00:...|         98|            |       0.0|                 10|    -3.0|
|         Oct 31,2014|         21|            |       0.0|                 10|    -5.0|
|2014-01-31 14:00:...|        18

(236656, 228275, 8381)

Verificar que existan movimientos con una antiguedad menor a 2014


In [79]:
movimientos.agg({"Fecha_Movimiento": "min"}).show()

+---------------------+
|min(Fecha_Movimiento)|
+---------------------+
| 2013-01-01 12:00:...|
+---------------------+



Transformar las fechas según esta regla: YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [80]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = movimientos.filter(movimientos["Fecha_Movimiento"].rlike(regex))
noCumplenFormato = movimientos.filter(~movimientos["Fecha_Movimiento"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('Fecha_Movimiento', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_Movimiento')))
movimientos = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), movimientos.count()

64254 172402
+----------------+-----------+------------+----------+-------------------+--------+
|Fecha_Movimiento|ID_Producto|ID_Proveedor|ID_Cliente|ID_Tipo_Transaccion|Cantidad|
+----------------+-----------+------------+----------+-------------------+--------+
|     Jan 03,2015|         39|            |     512.0|                 10|   -10.0|
|     Aug 03,2015|         45|            |     862.0|                 10|   -10.0|
|     May 12,2015|          9|            |     478.0|                 10|   -10.0|
|     May 07,2014|        215|            |      14.0|                 10|   -10.0|
|     Dec 28,2015|         71|            |     870.0|                 10|   -10.0|
+----------------+-----------+------------+----------+-------------------+--------+
only showing top 5 rows

None


(64254, 236656)

#### Carga

In [None]:
# CARGUE
inferior = 0
superior = 999
j=0
total = movimientos.count()/1000
print(total)
collected = movimientos.collect()
while j<total:
    if j%50==0:
        print(j)
    j += 1
    aux = spark.createDataFrame(collected[inferior:superior],movimientos.columns)
    guardar_db(dest_db_connection_string, aux,'Estudiante_37.Hecho_Movimiento', db_user, db_psswd)
    inferior+=1000
    superior+=1000

236.656
0
50
100
150
200


Fin de la carga