# Tutorial: creación de ETLs con PySpark

## 1. Introducción	
    ¿Qué aprenderá?
	En este tutorial aprenderá cómo puede utilizar PySpark para crear un proceso de ETL básico.

	¿Qué construirá?     
        Construirá un ETL que toma los datos desde la base de datos transacional de WideWorldImporters (WWImportersTransactional), los transforma  a una representación cercana al análisis y los  almacena en la base de datos relacional WWImportersDWH.
    
	¿Para qué?
	La construcción de ETLs que se ajusten a modelos multidimensionales es un paso necesario dentro de un proceso de analìtica 1.0 , pues permite tomar los datos crudos de una fuente, generalmente transaccional, para transformarlos en datos limpios que puedan utilizarse para la toma de decisiones.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter notebook
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador Connector J de MySQL (ya se encuentra instalado)
    5. Servidor SQL con base de datos relacional "WWImportersTransactional" y base de datos relacional que corresponde a la bodega de WWI "Estudiante_i"
	

## 2. Proceso de ETL para una dimensión.

En este proceso de ETL, se extraen los datos de las **órdenes de compra** de una base de datos transaccional y se almacenan en otra base de datos que corresponde a la bodega de datos, siguiendo una aproximación ROLAP. A continuación, se presenta el modelo multidimensional que es el modelo conceptual que representa el proceso de registro de órdenes de compra. Este modelo se utilizó para crear las tablas en la bodega de datos que representan el proceso de negocio y que serán cargadas como resultado del proceso ETL.

![Modelo ordenes](./WWI_modelo_ordenes.PNG)

El proceso de ETL debe ser diseñado antes de implementarse. A partir de las conclusiones del entendimiento de datos sabemos las fuentes que se van a  utilizar y la relación entre las fuentes. Adicionalmente, se cuenta con las respuestas de la organización a las preguntas, resultado del entendimiento de datos. De esa manera sabemos cómo se deben manipular los datos. 

Este proceso de ETL lo dividimos en seis bloques, uno para cada dimensión o <i>tabla de hechos</i> del modelo, con la única excepción de la dimensión de fecha que, por ser una dimensión especial que se genera de forma independiente, no se incluye aquí:

![ETL](./Disenio_ETL.PNG)

Recuerde que este es el diseño general. En el diseño completo se deben incluir las transformaciones realizadas a los datos a utilizarse en las dimensiones y tablas de hecho del modelo multidimensional, de acuerdo a lo que se muestra en la infografía de arquitectura de componentes (Componente proceso ETL) 

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_34'
db_psswd = 'KZZMEFHUSD'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_34'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>Empleado</i>, su fuente de datos viene de la tabla transaccional <i>Personas</i>. En la sentencia SQL filtramos usando WHERE para seleccionar solo las personas que sean vendedores y recuperamos únicamente los atributos que queremos, de acuerdo con  modelo dimensional. Recuerde que también puede usar el **.select()** de pyspark si no conoce los atributos de las tablas transaccionales. Sin embargo, es más eficiente aplicar el filtro en la consulta, ya que no trae a memoria más información de la necesaria.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. Note que aquí se pueden renombrar los atributos con la estructura <i>nombreActual AS nuevoNombre</i>. De la tabla de personas, En este paso, solo nos interesan los empleados, por lo cual se hace un filtro por medio del WHERE, buscando las personas cuyo atributo EsVendedor sea igual a 1.

In [5]:
sql_empleados = '''(SELECT ID_persona AS ID_Empleado, NombreCompleto AS Nombre, EsVendedor FROM WWImportersTransactional.Personas WHERE EsVendedor=1) AS Temp_empleados'''
empleados = obtener_dataframe_de_bd(source_db_connection_string, sql_empleados, db_user, db_psswd)
empleados.show(5)

+-----------+--------------+----------+
|ID_Empleado|        Nombre|EsVendedor|
+-----------+--------------+----------+
|          2|Kayla Woodcock|      true|
|          3| Hudson Onslow|      true|
|          6| Sophia Hinton|      true|
|          7|     Amy Trefl|      true|
|          8|Anthony Grosse|      true|
+-----------+--------------+----------+
only showing top 5 rows



#### Transformación
Recuerde que, puede hacer uso de selectExpr, filter, where entre otras de PySpark para modificar los datos cargados. Por ejemplo, el siguiente código utiliza <i>selectExpr</i> para renombrar la columna ID_Empleado por ID_Empleado_T, esta es la convención que vamos a utilizar: "_T" para indicar que el ID es el que estaba en la base de datos transaccional y "_DWH" para indicar que son ID's propios de la bodega. Usamos withColumn y monotonicallu_increasing_id para crear un ID acumulativo para cada registro en el dataframe

In [6]:
# TRANSFORMACION
empleados = empleados.selectExpr('ID_Empleado as ID_Empleado_T','Nombre')
empleados = empleados.withColumn('ID_Empleado_DWH', f.monotonically_increasing_id() + 1)
empleados.show(5)

+-------------+--------------+---------------+
|ID_Empleado_T|        Nombre|ID_Empleado_DWH|
+-------------+--------------+---------------+
|            2|Kayla Woodcock|              1|
|            3| Hudson Onslow|              2|
|            6| Sophia Hinton|              3|
|            7|     Amy Trefl|              4|
|            8|Anthony Grosse|              5|
+-------------+--------------+---------------+
only showing top 5 rows



#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

In [8]:
# CARGUE
guardar_db(dest_db_connection_string, empleados,'Estudiante_34.Empleado', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2
Empezamos el bloque 2: dimensión ciudad. Su fuente de datos es una combinación de las tablas transaccionales <i>paises, provinciasEstados y ciudades</i>

#### Extracción

In [9]:
#EXTRACCION
sql_paises = '''(SELECT ID_Pais, Nombre, Continente, Region, Subregion FROM WWImportersTransactional.Paises) AS Temp_paises'''
sql_provincias_estados = '''(SELECT ID_EstadosProvincias AS ID_EstadoProvincia, NombreEstadoProvincia, TerritorioVentas, ID_Pais FROM WWImportersTransactional.EstadosProvincias) AS Temp_estados_provincias'''
sql_ciudades = '''(SELECT ID_ciudad as ID_ciudad_T, NombreCiudad, ID_EstadoProvincia, Poblacion FROM WWImportersTransactional.Ciudades) AS Temp_ciudades'''

paises = obtener_dataframe_de_bd(source_db_connection_string, sql_paises, db_user, db_psswd)
provincias_estados = obtener_dataframe_de_bd(source_db_connection_string, sql_provincias_estados, db_user, db_psswd)
ciudades = obtener_dataframe_de_bd(source_db_connection_string, sql_ciudades, db_user, db_psswd)

print(ciudades.columns, paises.columns, provincias_estados.columns)

['ID_ciudad_T', 'NombreCiudad', 'ID_EstadoProvincia', 'Poblacion'] ['ID_Pais', 'Nombre', 'Continente', 'Region', 'Subregion'] ['ID_EstadoProvincia', 'NombreEstadoProvincia', 'TerritorioVentas', 'ID_Pais']


#### Transformación

In [10]:
# TRANSFORMACION
ciudades = ciudades.join(provincias_estados, how = 'inner', on = 'ID_EstadoProvincia')
ciudades = ciudades.join(paises, how = 'inner', on = 'ID_Pais')
ciudades = ciudades.withColumn('ID_Ciudad_DWH', f.monotonically_increasing_id() + 1)
ciudades.show(5)

+-------+------------------+-----------+------------+---------+---------------------+----------------+-------------+-------------+--------+----------------+-------------+
|ID_Pais|ID_EstadoProvincia|ID_ciudad_T|NombreCiudad|Poblacion|NombreEstadoProvincia|TerritorioVentas|       Nombre|   Continente|  Region|       Subregion|ID_Ciudad_DWH|
+-------+------------------+-----------+------------+---------+---------------------+----------------+-------------+-------------+--------+----------------+-------------+
|    230|                31|      38134|   Zarephath|       37|           New Jersey|         Mideast|United States|North America|Americas|Northern America|            1|
|    230|                31|      38003|   Yardville|     7186|           New Jersey|         Mideast|United States|North America|Americas|Northern America|            2|
|    230|                31|      37926|     Wyckoff|     null|           New Jersey|         Mideast|United States|North America|Americas|Northe

#### Carga

In [11]:
# CARGUE
guardar_db(dest_db_connection_string, ciudades,'Estudiante_34.Ciudad', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 3
Bloque 3: dimensión paquete. Su fuente de datos es la tabla transaccional <i>Paquetes</i>

#### Extracción

In [12]:
#EXTRACCION
sql_paquetes = '''(SELECT ID_TipoPaquete AS ID_TipoPaquete_T, TipoPaquete AS Nombre FROM WWImportersTransactional.Paquetes) AS Temp_Paquetes'''
paquetes = obtener_dataframe_de_bd(source_db_connection_string, sql_paquetes, db_user, db_psswd)

#### Transformación

In [13]:
# TRANSFORMACION
paquetes = paquetes.withColumn('ID_TipoPaquete_DWH', f.monotonically_increasing_id() + 1)
paquetes.show(5)

+----------------+------+------------------+
|ID_TipoPaquete_T|Nombre|ID_TipoPaquete_DWH|
+----------------+------+------------------+
|               1|   Bag|                 1|
|               2| Block|                 2|
|               3|Bottle|                 3|
|               4|   Box|                 4|
|               5|   Can|                 5|
+----------------+------+------------------+
only showing top 5 rows



#### Carga

In [14]:
# CARGUE
guardar_db(dest_db_connection_string, paquetes,'Estudiante_34.TipoPaquete', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 4
Bloque 4: dimensión producto, su fuente de datos es la combinación entre las tablas transaccionales Productos y Colores

#### Extracción

In [15]:
sql_productos = '''(SELECT ID_Producto as ID_Producto_T, ID_Color, NombreProducto, Marca, Necesita_refrigeracion, Dias_tiempo_entrega, Impuesto, PrecioUnitario, PrecioRecomendado FROM WWImportersTransactional.Producto) AS Temp_productos'''
sql_colores = '''(SELECT ID_Color, Color FROM WWImportersTransactional.Colores) AS Temp_colores'''

productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
colores = obtener_dataframe_de_bd(source_db_connection_string, sql_colores, db_user, db_psswd)

#### Transformación

In [16]:
# TRANSFORMACION
productos = productos.join(colores, how = 'inner', on = 'ID_Color')
productos = productos.withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productos.show(5)

+--------+-------------+--------------------+-----+----------------------+-------------------+--------+--------------+-----------------+-----+---------------+
|ID_Color|ID_Producto_T|      NombreProducto|Marca|Necesita_refrigeracion|Dias_tiempo_entrega|Impuesto|PrecioUnitario|PrecioRecomendado|Color|ID_Producto_DWH|
+--------+-------------+--------------------+-----+----------------------+-------------------+--------+--------------+-----------------+-----+---------------+
|       3|            3|Office cube peris...| null|                     0|                 14|      15|            19|               28|Black|              1|
|       3|           17|DBA joke mug - mi...| null|                     0|                 12|      15|            13|               19|Black|              2|
|       3|           19|DBA joke mug - da...| null|                     0|                 12|      15|            13|               19|Black|              3|
|       3|           21|DBA joke mug - yo...| 

#### Carga

In [17]:
# CARGUE
guardar_db(dest_db_connection_string, productos,'Estudiante_34.Producto', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 5
Bloque 5: dimensión cliente. Su fuente de datos es la combinación entre las tablas transaccionales Categorias de cliente, Grupos de compra y Clientes

#### Extracción

In [18]:
sql_categoriasCliente = '''(SELECT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT ID_Cliente as ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

#### Transformación

In [19]:
# TRANSFORMACION
clientes = clientes.join(gruposCompra, how = 'inner', on = 'ID_GrupoCompra')
clientes = clientes.join(categoriasCliente, how = 'inner', on = 'ID_Categoria')
clientes = clientes.withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
clientes.show(5)

+------------+--------------+------------+--------------------+--------------+----------------+-------------+-------------------+--------+-----------------+---------------+--------------+
|ID_Categoria|ID_GrupoCompra|ID_Cliente_T|              Nombre|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|NombreGrupoCompra|NombreCategoria|ID_Cliente_DWH|
+------------+--------------+------------+--------------------+--------------+----------------+-------------+-------------------+--------+-----------------+---------------+--------------+
|           3|             2|         601|Wingtip Toys (Rut...|           401|           29887|         null|2013-01-01 00:00:00|       7|     Wingtip Toys|   Novelty Shop|             1|
|           3|             2|         600|Wingtip Toys (Car...|           401|            5407|         null|2013-01-01 00:00:00|       7|     Wingtip Toys|   Novelty Shop|             2|
|           3|             2|         599|Wingtip Toys (Dic.

#### Carga

In [20]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Estudiante_34.Cliente', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 6
Bloque 6: Hecho orden. Su fuente de datos es la combinación entre las tablas transaccionales Ordenes y detalles de orden

#### Extracción

In [21]:
sql_ordenes = '''(SELECT * FROM WWImportersTransactional.Ordenes) AS Temp_ordenes'''
sql_detallesOrdenes = '''(SELECT * FROM WWImportersTransactional.DetallesOrdenes) AS Temp_detallesordenes'''
ordenes = obtener_dataframe_de_bd(source_db_connection_string, sql_ordenes, db_user, db_psswd)
detallesOrdenes = obtener_dataframe_de_bd(source_db_connection_string, sql_detallesOrdenes, db_user, db_psswd)

#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- La regla de negocio "La tasa de impuesto es de 10% o 15%" es correcta, pero habian errores en la tabla original, que fueron corregidos. 
- Para la segunda regla de negocio: "Son 73.595 órdenes detalladas en 231.412 lineas de detalle de órdenes realizadas desde 2013", si faltaban datos, los cuales fueron completados, y nos dicen que en cuanto a consistencia ellos revisaron las tablas e hicieron correcciones, pero que los duplicados completos de ordenes los eliminemos
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estan de acuerdo con que los estandarizemos y el formato sea el especificado en la regla
- Para las descripciones de productos que eran "a", se actualizaron a los valores reales. 
- Se pueden eliminar las columnas Comenarios, Instrucciones_de_entrega y comentarios_internos porque estan vacias. 
- A pesar de estar en un proceso de mejorar la calidad de los datos y mantener los nulos nos ayudaría a reflejar esa calidad, de la mano con el grupo de analitica de WWI se decide imputar por la media el valor extremo de la variable Cantidad
- Para las ordenes las columnas Seleccionado_por_ID_de_persona, ID_de_pedido_pendiente, Seleccion_completada_cuando, y para las columnas Seleccion_completada_cuando de la tabla detalles de ordenes, se decide mantener los valores vacíos, sin embargo para la variable Precio_unitario el negocio reviso y complemento los valores faltantes

Las tablas usadas en el tutorial de entendimiento de datos estaran disponibles para su revision con los siguientes nombres: OrdenesCopia y DetallesOrdenesCopia. 

Para este tutorial vamos a trabajar con unas tablas que dadas las conclusiones del tutorial de entendimiento, WWImporters revisó los datos originales, creo tablas y las llamo "Ordenes" y "DetallesOrdenes"

Se hace una verificación de los valores de la tasa de impuesto

In [22]:
detallesOrdenes.select("Tasa_de_impuesto").distinct().show()

+----------------+
|Tasa_de_impuesto|
+----------------+
|              10|
|              15|
+----------------+



Se hace una verificación del rango de fechas disponible en los datos

In [23]:
ordenes.agg({"Fecha_de_pedido": "min"}).show()

+--------------------+
|min(Fecha_de_pedido)|
+--------------------+
|          2013-01-01|
+--------------------+



Se elimina columnas Comenarios, Instrucciones_de_entrega y comentarios_internos

In [24]:
ordenes = ordenes.drop(*["Comentarios", "Instrucciones_de_entrega","comentarios_internos"])

Se eliminan duplicados totales de ordenes

In [25]:
print((ordenes.count(),ordenes.distinct().count()))

(107707, 93629)


In [26]:
ordenes = ordenes.drop_duplicates()

In [27]:
print((ordenes.count(),ordenes.distinct().count()))

(93629, 93629)


Se hace verificación de consistencia

In [28]:
#consistencia: revisar genially: definicion de consistencia
ids_ordenes = set([x.ID_de_pedido for x in ordenes.select('ID_de_pedido').collect()])
ids_detalles = set([x.ID_de_pedido for x in detallesOrdenes.select('ID_de_pedido').collect()])

len(ids_ordenes-ids_detalles), len(ids_detalles-ids_ordenes)

(0, 0)

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [29]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = ordenes.filter(ordenes["Fecha_de_pedido"].rlike(regex))
noCumplenFormato = ordenes.filter(~ordenes["Fecha_de_pedido"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('Fecha_de_pedido', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_de_pedido')))
ordenes = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()

20034 73595
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|    Jan 28,2014|               201

(20034, 93629)

Descripciones


In [30]:
detallesOrdenes.where(length(col("Descripcion")) <= 10).show()

+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+



Imputar valor maximo de cantidad

In [31]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[1]

Row(Cantidad=360)

In [32]:
detallesOrdenes = detallesOrdenes.replace( 10000000, 360, 'Cantidad')

In [33]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[0]

Row(Cantidad=360)

In [34]:
detallesOrdenes.show(5)

+----------------+------------+-----------+--------------------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|         Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+--------------------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|               1|          45|        164|32 mm Double side...|              7|      50|            112|              15|                   50|        2013-01-02 11:00:00|
|               2|           1|         67|Ride on toy sedan...|              7|      10|            230|              15|                   10|        2013-01-01 11:00:00|
|               3|           2|         50|Developer joke mu...|              7|       9|             13|              15|             

In [35]:
ordenes.show(5)

+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|     2014-01-28|               2014-01-29|    

Se unen los dos dataframes, se verifica que no haya duplicados y si los hay se eliminan

In [36]:
ordenes = ordenes.join(detallesOrdenes, how = 'inner', on = 'ID_de_pedido')
ordenes = ordenes.withColumn('Valor_total',col('Precio_unitario')*col('Cantidad'))
ordenes = ordenes.withColumn('Impuestos',col('Valor_total')*col('Tasa_de_impuesto'))
ordenes = ordenes.selectExpr('ID_de_pedido as ID_de_pedido_T','ID_Producto','Fecha_de_pedido','ID_de_cliente','ID_de_vendedor','ID_Tipo_Paquete','Cantidad','Valor_total', 'Impuestos')

print((ordenes.count(),ordenes.distinct().count()))

ordenes = ordenes.drop_duplicates()
ordenes = ordenes.withColumn('ID_de_pedido_DWH', f.monotonically_increasing_id() + 1)
ordenes.show(5)

(294314, 231412)
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+----------------+
|ID_de_pedido_T|ID_Producto|Fecha_de_pedido|ID_de_cliente|ID_de_vendedor|ID_Tipo_Paquete|Cantidad|Valor_total|Impuestos|ID_de_pedido_DWH|
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+----------------+
|           148|        203|     2013-01-02|          812|            13|              7|      40|       1280|    19200|               1|
|           463|         64|     2013-01-09|          555|             3|              7|       1|         30|      450|               2|
|           463|         10|     2013-01-09|          555|             3|              7|       8|        256|     3840|               3|
|           463|         16|     2013-01-09|          555|             3|              7|      10|        130|     1950|               4|
|           463| 

#### Carga

In [37]:
# CARGUE
inferior = 0
superior = 999
j=0
total = ordenes.count()/1000
print(total)
collected = ordenes.collect()
while j<total:
    if j%50==0:
        print(j)
    j += 1
    aux = spark.createDataFrame(collected[inferior:superior],ordenes.columns)
    guardar_db(dest_db_connection_string, aux,'Estudiante_34.HechoOrden', db_user, db_psswd)
    inferior+=1000
    superior+=1000

231.412
0
50
100
150
200


Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

# Bloque 1 - Proveedor
## Extracción


In [113]:
#EXTRACCION

sql_proveedores = '''(SELECT ProveedorID AS ID_Proveedor, NombreProveedor AS Nombre, CategoriaProveedorID AS ID_Categoria, PersonaContactoPrincipalID as ID_Contacto_principal, DiasPago AS Dias_pago, CodigoPostal AS Codigo_postal FROM WWImportersTransactional.Proveedores) AS Temp_proveedores'''
sql_categoria = '''(SELECT CategoriaProveedorID AS ID_Categoria, CategoriaProveedor FROM WWImportersTransactional.CategoriasProveedores) AS Temp_categorias'''
sql_personas_contacto_principal = '''(SELECT ID_persona AS ID_Contacto_principal, NombreCompleto AS Contacto_principal FROM WWImportersTransactional.Personas) AS TempContactoPrincipal'''

proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)
categorias = obtener_dataframe_de_bd(source_db_connection_string, sql_categoria, db_user, db_psswd)
contactos_principales = obtener_dataframe_de_bd(source_db_connection_string, sql_personas_contacto_principal, db_user, db_psswd)

print(proveedores.columns)
print(categorias.columns) 
print(contactos_principales.columns)

['ID_Proveedor', 'Nombre', 'ID_Categoria', 'ID_Contacto_principal', 'Dias_pago', 'Codigo_postal']
['ID_Categoria', 'CategoriaProveedor']
['ID_Contacto_principal', 'Contacto_principal']


## Transformación

Los días de pago no pueden ser negativos por lo que se múltiplica su valor por -1

In [114]:
proveedores = proveedores.withColumn('Dias_pago', when(col('Dias_pago') < 0, col('Dias_pago') * -1).otherwise(col('Dias_pago')))
proveedores.show(10)

+------------+--------------------+------------+---------------------+---------+-------------+
|ID_Proveedor|              Nombre|ID_Categoria|ID_Contacto_principal|Dias_pago|Codigo_postal|
+------------+--------------------+------------+---------------------+---------+-------------+
|           4|      Fabrikam, Inc.|           4|                   27|       30|        40351|
|           5|Graphic Design In...|           2|                   29|       14|        64847|
|           7|       Litware, Inc.|           5|                   33|       30|        95245|
|           9|      Nod Publishers|           2|                   37|        7|        27906|
|          10|Northwind Electri...|           3|                   39|       30|         7860|
|          12|   The Phone Company|           2|                   43|       30|        56732|
|          13|      Woodgrove Bank|           7|                   45|        7|        94101|
|           2|       Contoso, Ltd.|           2|  

In [115]:
# TRANSFORMACION
proveedores = proveedores.join(categorias, how = 'left', on = 'ID_Categoria')
proveedores = proveedores.join(contactos_principales, how = 'left', on = 'ID_Contacto_principal')
proveedores = proveedores.withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
print(proveedores.columns)
proveedores.show(5)

['ID_Contacto_principal', 'ID_Categoria', 'ID_Proveedor', 'Nombre', 'Dias_pago', 'Codigo_postal', 'CategoriaProveedor', 'Contacto_principal', 'ID_Proveedor_DWH']
+---------------------+------------+------------+-------------------+---------+-------------+--------------------+------------------+----------------+
|ID_Contacto_principal|ID_Categoria|ID_Proveedor|             Nombre|Dias_pago|Codigo_postal|  CategoriaProveedor|Contacto_principal|ID_Proveedor_DWH|
+---------------------+------------+------------+-------------------+---------+-------------+--------------------+------------------+----------------+
|                   31|           9|           6|Humongous Insurance|       14|        37770|servicios de seguros|Madelaine  Cartier|               1|
|                   27|           4|           4|     Fabrikam, Inc.|       30|        40351|                ropa|       Bill Lawson|               2|
|                   41|           8|          11|      Trey Research|        7|    

## Carga

In [116]:
# Carga
guardar_db(dest_db_connection_string, proveedores,'Estudiante_34.Proveedor', db_user, db_psswd)


# Bloque 2 - Tipo Transacción
## Extracción

In [70]:
sql_tipo_transaccion = '''(SELECT TipoTransaccionID AS ID_Tipo_transaccion, TipoTransaccionNombre AS Tipo FROM WWImportersTransactional.TiposTransaccion) AS Temp_tipoTransaccion'''
tipo_transacciones = obtener_dataframe_de_bd(source_db_connection_string, sql_tipo_transaccion, db_user, db_psswd)
tipo_transacciones.show(5)

+-------------------+--------------------+
|ID_Tipo_transaccion|                Tipo|
+-------------------+--------------------+
|                  2|Customer Credit Note|
|                  3|Customer Payment ...|
|                  4|     Customer Refund|
|                  5|    Supplier Invoice|
|                  6|Supplier Credit Note|
+-------------------+--------------------+
only showing top 5 rows



Transformación

In [71]:
# Transformación
tipo_transacciones = tipo_transacciones.withColumn('ID_Tipo_Transaccion_DWH', f.monotonically_increasing_id() + 1)
tipo_transacciones.show(5)

+-------------------+--------------------+-----------------------+
|ID_Tipo_transaccion|                Tipo|ID_Tipo_Transaccion_DWH|
+-------------------+--------------------+-----------------------+
|                  2|Customer Credit Note|                      1|
|                  3|Customer Payment ...|                      2|
|                  4|     Customer Refund|                      3|
|                  5|    Supplier Invoice|                      4|
|                  6|Supplier Credit Note|                      5|
+-------------------+--------------------+-----------------------+
only showing top 5 rows



## Carga

In [72]:
# Carga
guardar_db(dest_db_connection_string, tipo_transacciones,'Estudiante_34.TipoTransaccion', db_user, db_psswd)

## Bloque 3 Hecho Movimiento
# Extracción

In [75]:
sql_movimientos = '''(SELECT * FROM WWImportersTransactional.movimientos) AS Temp_movimientos'''
movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)


## Transformación

Sobre los resultados del entendimiento de datos, Wide World Importers les comenta lo siguiente:

- Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.
- Sobre “La cantidad máxima de productos movidos es 50 millones por viaje”, encontramos que efectivamente gracias a los avances ya podemos cargar más que la cantidad de 50 millones por viajes.
- La falta de datos antes del 2014 es un error de extracción de datos. Los nuevos datos incluyen este año.
- Nuestro análisis concluye que la información que se ha duplicado totalmente no es útil. Por favor no tenerlos en cuenta.
- “El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD”: En cuanto a formatos de fechas estamos de acuerdo con que los estandarizemos y el formato sea el especificado en la regla de negocio.
- Existen proveedores que tienen 2 filas una con un nombre y otra con el mismo nombre mas un “Inc” o “Ltd”. Unimos estos a un solo proveedor dado que se trató de un error de digitación.
- El código postal igual para todos nuestros proveedores es un error que también fue corregido.
- Cantidades negativas significan salidas de productos del inventario
- Cada fila representa un viaje o transacción de productos
- Los datos revisados quedan en las tablas Proveedores y movimientos, en las tablas ProveedoresCopia y movimientosCopia quedan los datos con errores en caso de que deseen revisar/ejecutar el ejercicio de entendimiento de datos

Se verifican si existen duplicados

In [119]:
print((movimientos.count(),movimientos.distinct().count()))

(267300, 236667)


Se eliminan los duplicados

In [128]:
movimientos = movimientos.drop_duplicates()

Se hace verificación de que no existan duplicados

In [134]:
print((movimientos.count(),movimientos.distinct().count()))

(236667, 236667)


In [142]:
print(movimientos.columns)

['StockItemTransactionID', 'StockItemID', 'TransactionTypeID', 'CustomerID', 'InvoiceID', 'SupplierID', 'PurchaseOrderID', 'TransactionOccurredWhen', 'Quantity']


In [None]:
Se verifica que el formato de fecha coincida con lo solicitado

In [141]:

# TRANSFORMACION
regex = "([0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) (2[0-3]|[01][0-9]):[0-5][0-9]:[0-5][0-9])|([12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01]))"
cumplenFormato = movimientos.filter(movimientos["TransactionOccurredWhen"].rlike(regex))
noCumplenFormato = movimientos.filter(~movimientos["TransactionOccurredWhen"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('TransactionOccurredWhen', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('TransactionOccurredWhen')))
ordenes = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()


64254 172413
+----------------------+-----------+-----------------+----------+---------+----------+---------------+-----------------------+--------+
|StockItemTransactionID|StockItemID|TransactionTypeID|CustomerID|InvoiceID|SupplierID|PurchaseOrderID|TransactionOccurredWhen|Quantity|
+----------------------+-----------+-----------------+----------+---------+----------+---------------+-----------------------+--------+
|                243458|         71|               10|     566.0|  50999.0|          |               |            Jul 14,2015|   -10.0|
|                103002|        108|               10|     536.0|  21590.0|          |               |            Feb 21,2014|   -10.0|
|                174167|        203|               10|     195.0|  36471.0|          |               |            Nov 14,2014|   -10.0|
|                192808|        118|               10|      38.0|  40378.0|          |               |            Jan 26,2015|   -10.0|
|                231792|         26

(64254, 236667)

In [None]:
movimientos = movimientos.withColumn('ID_Movimientos_DWH', f.monotonically_increasing_id() + 1)

## Carga


In [147]:
# Carga
guardar_db(dest_db_connection_string, movimientos,'Estudiante_34.HechoMovimientos', db_user, db_psswd)

## 4. Cierre
Completado este tutorial, ya sabe cómo realizar ETL básicos en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

El Capítulo 2 del libro <i>Learn PySpark : Build Python-based Machine Learning and Deep Learning Models, New York: Apress. 2019</i> de Pramod Singh contiene muchos ejemplos útiles, puede encontrarlo en la biblioteca virtual de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.