# Tutorial: creación de ETLs con PySpark

## 1. Introducción	
    ¿Qué aprenderá?
	En este tutorial aprenderá cómo puede utilizar PySpark para crear un proceso de ETL básico.

	¿Qué construirá?     
        Construirá un ETL que toma los datos desde la base de datos transacional de WideWorldImporters (WWImportersTransactional), los transforma  a una representación cercana al análisis y los  almacena en la base de datos relacional WWImportersDWH.
    
	¿Para qué?
	La construcción de ETLs que se ajusten a modelos multidimensionales es un paso necesario dentro de un proceso de analìtica 1.0 , pues permite tomar los datos crudos de una fuente, generalmente transaccional, para transformarlos en datos limpios que puedan utilizarse para la toma de decisiones.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter notebook
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador Connector J de MySQL (ya se encuentra instalado)
    5. Servidor SQL con base de datos relacional "WWImportersTransactional" y base de datos relacional que corresponde a la bodega de WWI "Estudiante_i"
	

## 2. Proceso de ETL para una dimensión.

En este proceso de ETL, se extraen los datos de las **órdenes de compra** de una base de datos transaccional y se almacenan en otra base de datos que corresponde a la bodega de datos, siguiendo una aproximación ROLAP. A continuación, se presenta el modelo multidimensional que es el modelo conceptual que representa el proceso de registro de órdenes de compra. Este modelo se utilizó para crear las tablas en la bodega de datos que representan el proceso de negocio y que serán cargadas como resultado del proceso ETL. 

Tenga en cuenta que las llaves ID_XXXX presentes en el modelo hacen referencia a las llaves de la bodega. Por otra parte, en el proceso de ETL se van a tener en cuenta las llaves transaccionales (**WWImportersTransactional**). La nomenclatura para utilizar es:

1.   ID_XXXX_DWH, para las llaves de la bodega.
2.   ID_XXXX_T, para las llaves transaccionales.


![Modelo ordenes](./WWI_modelo_ordenes.png)

El proceso de ETL debe ser diseñado antes de implementarse. A partir de las conclusiones del entendimiento de datos sabemos las fuentes que se van a  utilizar y la relación entre las fuentes. Adicionalmente, se cuenta con las respuestas de la organización a las preguntas, resultado del entendimiento de datos. De esa manera sabemos cómo se deben manipular los datos. 

Este proceso de ETL lo dividimos en seis bloques, uno para cada dimensión o <i>tabla de hechos</i> del modelo, con la única excepción de la dimensión de fecha que, por ser una dimensión especial que se genera de forma independiente, no se incluye aquí:

![ETL](./Disenio_ETL.PNG)

Recuerde que este es el diseño general. En el diseño completo se deben incluir las transformaciones realizadas a los datos a utilizarse en las dimensiones y tablas de hecho del modelo multidimensional, de acuerdo con lo que se muestra en la infografía de arquitectura de componentes (Componente proceso ETL) 

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_19_202413'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.120:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.120:8080/Estudiante_19_202413'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>Empleado</i>, su fuente de datos viene de la tabla transaccional <i>Personas</i>. En la sentencia SQL filtramos usando WHERE para seleccionar solo las personas que sean vendedores y recuperamos únicamente los atributos que queremos, de acuerdo con  modelo dimensional. Recuerde que también puede usar el **.select()** de pyspark si no conoce los atributos de las tablas transaccionales. Sin embargo, es más eficiente aplicar el filtro en la consulta, ya que no trae a memoria más información de la necesaria.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. Note que aquí se pueden renombrar los atributos con la estructura <i>nombreActual AS nuevoNombre</i>. De la tabla de personas, En este paso, solo nos interesan los empleados, por lo cual se hace un filtro por medio del WHERE, buscando las personas cuyo atributo EsVendedor sea igual a 1.

In [5]:
sql_empleados = '''(SELECT DISTINCT ID_persona AS ID_Empleado, NombreCompleto AS Nombre, EsVendedor FROM WWImportersTransactional.Personas WHERE EsVendedor=1) AS Temp_empleados'''
empleados = obtener_dataframe_de_bd(source_db_connection_string, sql_empleados, db_user, db_psswd)
empleados.show(10)

+-----------+------------------+----------+
|ID_Empleado|            Nombre|EsVendedor|
+-----------+------------------+----------+
|          2|    Kayla Woodcock|      true|
|          3|     Hudson Onslow|      true|
|          6|     Sophia Hinton|      true|
|          7|         Amy Trefl|      true|
|          8|    Anthony Grosse|      true|
|         13|Hudson Hollinworth|      true|
|         14|         Lily Code|      true|
|         15|         Taj Shand|      true|
|         16|     Archer Lamble|      true|
|         20|       Jack Potter|      true|
+-----------+------------------+----------+



#### Transformación
Recuerde que, puede hacer uso de selectExpr, filter, where entre otras de PySpark para modificar los datos cargados. Por ejemplo, el siguiente código utiliza <i>selectExpr</i> para renombrar la columna ID_Empleado por ID_Empleado_T, esta es la convención que vamos a utilizar: "_T" para indicar que el ID es el que estaba en la base de datos transaccional y "_DWH" para indicar que son ID's propios de la bodega. Usamos withColumn y monotonicallu_increasing_id para crear un ID acumulativo para cada registro en el dataframe

In [6]:
# TRANSFORMACION
empleados = empleados.selectExpr('ID_Empleado as ID_Empleado_T','Nombre')
empleados = empleados.coalesce(1).withColumn('ID_Empleado_DWH', f.monotonically_increasing_id() + 1)
empleados = empleados.select('ID_Empleado_DWH','ID_Empleado_T','Nombre')
empleados.show(5)

+---------------+-------------+--------------+
|ID_Empleado_DWH|ID_Empleado_T|        Nombre|
+---------------+-------------+--------------+
|              1|            2|Kayla Woodcock|
|              2|            3| Hudson Onslow|
|              3|            6| Sophia Hinton|
|              4|            7|     Amy Trefl|
|              5|            8|Anthony Grosse|
+---------------+-------------+--------------+
only showing top 5 rows



#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [7]:
# CARGUE
guardar_db(dest_db_connection_string, empleados,'Estudiante_19_202413.Empleados', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2
Empezamos el bloque 2: dimensión ciudad. Su fuente de datos es una combinación de las tablas transaccionales <i>paises, provinciasEstados y ciudades</i>

#### Extracción

In [8]:
#EXTRACCION
sql_paises = '''(SELECT DISTINCT ID_Pais, Nombre, Continente, Region, Subregion FROM WWImportersTransactional.Paises) AS Temp_paises'''
sql_provincias_estados = '''(SELECT DISTINCT ID_EstadosProvincias AS ID_EstadoProvincia, NombreEstadoProvincia, TerritorioVentas, ID_Pais FROM WWImportersTransactional.EstadosProvincias) AS Temp_estados_provincias'''
sql_ciudades = '''(SELECT DISTINCT ID_ciudad as ID_ciudad_T, NombreCiudad, ID_EstadoProvincia, Poblacion FROM WWImportersTransactional.Ciudades) AS Temp_ciudades'''

paises = obtener_dataframe_de_bd(source_db_connection_string, sql_paises, db_user, db_psswd)
provincias_estados = obtener_dataframe_de_bd(source_db_connection_string, sql_provincias_estados, db_user, db_psswd)
ciudades = obtener_dataframe_de_bd(source_db_connection_string, sql_ciudades, db_user, db_psswd)

print(ciudades.columns, paises.columns, provincias_estados.columns)

['ID_ciudad_T', 'NombreCiudad', 'ID_EstadoProvincia', 'Poblacion'] ['ID_Pais', 'Nombre', 'Continente', 'Region', 'Subregion'] ['ID_EstadoProvincia', 'NombreEstadoProvincia', 'TerritorioVentas', 'ID_Pais']


#### Transformación

In [9]:
# TRANSFORMACION
ciudades = ciudades.join(provincias_estados, how = 'left', on = 'ID_EstadoProvincia')
ciudades = ciudades.join(paises, how = 'left', on = 'ID_Pais')
ciudades = ciudades.coalesce(1).withColumn('ID_Ciudad_DWH', f.monotonically_increasing_id() + 1)
ciudades = ciudades.select('ID_Ciudad_DWH','ID_ciudad_T','NombreCiudad','Continente','Nombre','Poblacion',
                          'Region','TerritorioVentas','NombreEstadoProvincia','Subregion') \
                    .withColumnRenamed('Nombre','Pais')
ciudades.show(5)

+-------------+-----------+------------+-------------+-------------+---------+--------+----------------+---------------------+----------------+
|ID_Ciudad_DWH|ID_ciudad_T|NombreCiudad|   Continente|         Pais|Poblacion|  Region|TerritorioVentas|NombreEstadoProvincia|       Subregion|
+-------------+-----------+------------+-------------+-------------+---------+--------+----------------+---------------------+----------------+
|            1|         49|     Absecon|North America|United States|     8411|Americas|         Mideast|           New Jersey|Northern America|
|            2|        150|    Adelphia|North America|United States|     null|Americas|         Mideast|           New Jersey|Northern America|
|            3|        336|      Albion|North America|United States|     null|Americas|         Mideast|           New Jersey|Northern America|
|            4|        458|   Allamuchy|North America|United States|       78|Americas|         Mideast|           New Jersey|Northern A

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [11]:
# CARGUE
guardar_db(dest_db_connection_string, ciudades,'Estudiante_19_202413.Ciudad', db_user, db_psswd)

AnalysisException: Column "ID_Ciudad_DWH" not found in schema Some(StructType(StructField(ID_Ciudad,IntegerType,false), StructField(Nombre,StringType,true), StructField(Continente,StringType,true), StructField(Pais,StringType,true), StructField(Población,IntegerType,true), StructField(Region,StringType,true), StructField(Territorio_ventas,StringType,true), StructField(Estado_provincia,StringType,true), StructField(Subregion,StringType,true)))

Verifique los resultados usando MySQL Workbench

### BLOQUE 3
Bloque 3: dimensión paquete. Su fuente de datos es la tabla transaccional <i>Paquetes</i>

#### Extracción

In [12]:
#EXTRACCION
sql_paquetes = '''(SELECT DISTINCT ID_TipoPaquete AS ID_TipoPaquete_T, TipoPaquete AS Nombre FROM WWImportersTransactional.Paquetes) AS Temp_Paquetes'''
paquetes = obtener_dataframe_de_bd(source_db_connection_string, sql_paquetes, db_user, db_psswd)

#### Transformación

In [13]:
# TRANSFORMACION
paquetes = paquetes.coalesce(1).withColumn('ID_TipoPaquete_DWH', f.monotonically_increasing_id() + 1)
paquetes = paquetes.select('ID_TipoPaquete_DWH','ID_TipoPaquete_T','Nombre')
paquetes.show(5)

+------------------+----------------+------+
|ID_TipoPaquete_DWH|ID_TipoPaquete_T|Nombre|
+------------------+----------------+------+
|                 1|               1|   Bag|
|                 2|               2| Block|
|                 3|               3|Bottle|
|                 4|               4|   Box|
|                 5|               5|   Can|
+------------------+----------------+------+
only showing top 5 rows



#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [15]:
# CARGUE
guardar_db(dest_db_connection_string, paquetes,'Estudiante_19_202413.TipoPaquete', db_user, db_psswd)

AnalysisException: Column "ID_TipoPaquete_DWH" not found in schema Some(StructType(StructField(ID_TipoPaquete,IntegerType,false), StructField(Nombre,StringType,true)))

Verifique los resultados usando MySQL Workbench

### BLOQUE 4
Bloque 4: dimensión producto, su fuente de datos es la combinación entre las tablas transaccionales Productos y Colores

#### Extracción

In [16]:
sql_productos = '''(SELECT DISTINCT ID_Producto as ID_Producto_T, ID_Color, NombreProducto, Marca, Necesita_refrigeracion, Dias_tiempo_entrega, Impuesto, PrecioUnitario, PrecioRecomendado FROM WWImportersTransactional.Producto) AS Temp_productos'''
sql_colores = '''(SELECT DISTINCT ID_Color, Color FROM WWImportersTransactional.Colores) AS Temp_colores'''

productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
colores = obtener_dataframe_de_bd(source_db_connection_string, sql_colores, db_user, db_psswd)

#### Transformación

In [17]:
# TRANSFORMACION
productos = productos.join(colores, how = 'left', on = 'ID_Color').fillna({'Color': 'Missing'})
productos = productos.coalesce(1).withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productos = productos.select('ID_Producto_DWH','ID_Producto_T','NombreProducto','Marca','Color','Necesita_refrigeracion','Dias_tiempo_entrega','PrecioRecomendado','Impuesto','PrecioUnitario')
productos.show(5)

+---------------+-------------+--------------------+-----+-------+----------------------+-------------------+-----------------+--------+--------------+
|ID_Producto_DWH|ID_Producto_T|      NombreProducto|Marca|  Color|Necesita_refrigeracion|Dias_tiempo_entrega|PrecioRecomendado|Impuesto|PrecioUnitario|
+---------------+-------------+--------------------+-----+-------+----------------------+-------------------+-----------------+--------+--------------+
|              1|            1|USB missile launc...| null|Missing|                     0|                 14|               37|      15|            25|
|              2|            4|USB food flash dr...| null|Missing|                     0|                 14|               48|      15|            32|
|              3|            5|USB food flash dr...| null|Missing|                     0|                 14|               48|      15|            32|
|              4|            6|USB food flash dr...| null|Missing|                     0

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [18]:
# CARGUE
guardar_db(dest_db_connection_string, productos,'Estudiante_19_202413.Producto', db_user, db_psswd)

AnalysisException: Column "ID_Producto_DWH" not found in schema Some(StructType(StructField(ID_Producto,IntegerType,false), StructField(Nombre,StringType,true), StructField(Marca,StringType,true), StructField(Paquete_de_compra,StringType,true), StructField(Color,StringType,true), StructField(Necesita_refrigeracion,BooleanType,true), StructField(Dias_tiempo_entrega,IntegerType,true), StructField(cantidad_por_salida,IntegerType,true), StructField(Precio_minorista_recomendado,DoubleType,true), StructField(Paquete_de_venta,StringType,true), StructField(Precio_unitario,DoubleType,true)))

Verifique los resultados usando MySQL Workbench

### BLOQUE 5
Bloque 5: dimensión cliente. Su fuente de datos es la combinación entre las tablas transaccionales Categorias de cliente, Grupos de compra y Clientes

#### Extracción

In [19]:
sql_categoriasCliente = '''(SELECT DISTINCT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT DISTINCT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT DISTINCT ID_Cliente as ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

#### Transformación

In [20]:
# TRANSFORMACION 
# EL SUPUESTO QUE SE TIENE ES QUE TODOS LOS CLIENTES TIENEN TODOS SUS DATOS DE CATEGORIA Y GRUPO Y NO SE ESTÁN PERDIENDO CLIENTES AL REALIZAR ESTE JOIN
clientes = clientes.join(gruposCompra, how = 'left', on = 'ID_GrupoCompra')
clientes = clientes.alias('cl').join(categoriasCliente.alias('ct'), how = 'left', on = 'ID_Categoria') \
.select([col('cl.ID_Cliente_T'),col('cl.Nombre'),col('ct.NombreCategoria'),col('cl.NombreGrupoCompra') \
        ,col('cl.ClienteFactura'),col('cl.ID_CiudadEntrega'),col('cl.LimiteCredito'),col('cl.FechaAperturaCuenta'),col('cl.DiasPago')])
clientes = clientes.coalesce(1).withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
clientes = clientes.select('ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
                          'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago')

clientes = clientes.fillna({'NombreCategoria':'Missing','NombreGrupoCompra':'Missing'})
clientes.show(5)

+--------------+------------+-----------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|           Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+-----------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         803|       Bala Dixit|   Novelty Shop|          Missing|           803|           33799|         2000|2013-01-01 00:00:00|       7|
|             2|         805|     Ratan Poddar|   Novelty Shop|          Missing|           805|           10194|         3300|2013-01-01 00:00:00|       7|
|             3|         806|           Shi Tu|   Novelty Shop|          Missing|           806|           31685|         3000|2013-01-01 00:00:00|       7|
|             4|         811|    Surendra Sahu|   Novelty 

In [21]:
# Crea el registro para el id = 0
clientes_0 = [('0','','Missing','Missing','Missing','0','0','','','')]
columns = ['ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
            'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago']
cliente_0 = spark.createDataFrame(data=clientes_0,schema=columns)
cliente_0.show()

+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T| Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |Missing|        Missing|          Missing|             0|               0|             |                   |        |
+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+



In [22]:
clientes = clientes.union(cliente_0)
clientes.show(5)

+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|            Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         807|     Gunnar Lohmus|     Gift Store|          Missing|           807|           35810|         3100|2013-01-01 00:00:00|       7|
|             2|         816|        Harsha Huq|     Gift Store|          Missing|           816|            8892|         2400|2013-01-01 00:00:00|       7|
|             3|         823|Francisca Laureano|     Gift Store|          Missing|           823|           17142|         3500|2013-01-01 00:00:00|       7|
|             4|         825|    Jayanta Thakur|    

In [23]:
# Se ordena por el identificador DWH
clientes = clientes.withColumn('ID_Cliente_DWH',col('ID_Cliente_DWH').cast('int')).orderBy(col('ID_Cliente_DWH'))
clientes.show(5)

+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|            Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |           Missing|        Missing|          Missing|             0|               0|             |                   |        |
|             1|         807|     Gunnar Lohmus|     Gift Store|          Missing|           807|           35810|         3100|2013-01-01 00:00:00|       7|
|             2|         816|        Harsha Huq|     Gift Store|          Missing|           816|            8892|         2400|2013-01-01 00:00:00|       7|
|             3|         823|Francisca Laureano|    

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [26]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Estudiante_19_202413.Cliente', db_user, db_psswd)

AnalysisException: Column "ID_CiudadEntrega" not found in schema Some(StructType(StructField(ID_Cliente_DWH,IntegerType,false), StructField(ID_Cliente_T,IntegerType,true), StructField(Nombre,StringType,true), StructField(ClienteFactura,StringType,true), StructField(ID_CiudadEntrega_DWH,IntegerType,true), StructField(LimiteCredito,DecimalType(10,2),true), StructField(FechaAperturaCuenta,DateType,true), StructField(DiasPago,IntegerType,true), StructField(NombreGrupoCompra,StringType,true), StructField(NombreCategoria,StringType,true)))

Verifique los resultados usando MySQL Workbench

### BLOQUE 6
Bloque 6: Hecho orden. Su fuente de datos es la combinación entre las tablas transaccionales Ordenes y detalles de orden

#### Extracción

In [25]:
sql_ordenes = '''(SELECT DISTINCT * FROM WWImportersTransactional.Ordenes) AS Temp_ordenes'''
sql_detallesOrdenes = '''(SELECT DISTINCT * FROM WWImportersTransactional.DetallesOrdenes) AS Temp_detallesordenes'''
ordenes = obtener_dataframe_de_bd(source_db_connection_string, sql_ordenes, db_user, db_psswd)
detallesOrdenes = obtener_dataframe_de_bd(source_db_connection_string, sql_detallesOrdenes, db_user, db_psswd)

Py4JJavaError: An error occurred while calling o352.load.
: java.sql.SQLSyntaxErrorException: Table 'WWImportersTransactional.Ordenes' doesn't exist
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1009)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:67)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:57)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:239)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- La regla de negocio "La tasa de impuesto es de 10% o 15%" es correcta, pero habian errores en la tabla original, que fueron corregidos. 
- Para la segunda regla de negocio: "Son 73.595 órdenes detalladas en 231.412 lineas de detalle de órdenes realizadas desde 2013", si faltaban datos, los cuales fueron completados, y nos dicen que en cuanto a consistencia ellos revisaron las tablas e hicieron correcciones, pero que los duplicados completos de ordenes los eliminemos
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estan de acuerdo con que los estandarizemos y el formato sea el especificado en la regla
- Para las descripciones de productos que eran "a", se actualizaron a los valores reales. 
- Se pueden eliminar las columnas Comenarios, Instrucciones_de_entrega y comentarios_internos porque estan vacias. 
- A pesar de estar en un proceso de mejorar la calidad de los datos y mantener los nulos nos ayudaría a reflejar esa calidad, de la mano con el grupo de analitica de WWI se decide imputar por la media el valor extremo de la variable Cantidad
- Para las ordenes las columnas Seleccionado_por_ID_de_persona, ID_de_pedido_pendiente, Seleccion_completada_cuando, y para las columnas Seleccion_completada_cuando de la tabla detalles de ordenes, se decide mantener los valores vacíos, sin embargo para la variable Precio_unitario el negocio reviso y complemento los valores faltantes

Las tablas usadas en el tutorial de entendimiento de datos estaran disponibles para su revision con los siguientes nombres: OrdenesCopia y DetallesOrdenesCopia. 

Para este tutorial vamos a trabajar con unas tablas que dadas las conclusiones del tutorial de entendimiento, WWImporters revisó los datos originales, creo tablas y las llamo "Ordenes" y "DetallesOrdenes"

Se hace una verificación de los valores de la tasa de impuesto

In [None]:
detallesOrdenes.select("Tasa_de_impuesto").distinct().show()

Se hace una verificación del rango de fechas disponible en los datos

In [None]:
ordenes.agg({"Fecha_de_pedido": "min"}).show()

Se elimina columnas Comenarios, Instrucciones_de_entrega y comentarios_internos

In [None]:
ordenes = ordenes.drop(*["Comentarios", "Instrucciones_de_entrega","comentarios_internos"])

Se eliminan duplicados totales de ordenes

In [None]:
print((ordenes.count(),ordenes.distinct().count()))

In [None]:
ordenes = ordenes.drop_duplicates()

In [None]:
print((ordenes.count(),ordenes.distinct().count()))

Se hace verificación de consistencia

In [None]:
#consistencia: revisar genially: definicion de consistencia
ids_ordenes = set([x.ID_de_pedido for x in ordenes.select('ID_de_pedido').collect()])
ids_detalles = set([x.ID_de_pedido for x in detallesOrdenes.select('ID_de_pedido').collect()])

len(ids_ordenes-ids_detalles), len(ids_detalles-ids_ordenes)

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [None]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = ordenes.filter(ordenes["Fecha_de_pedido"].rlike(regex))
noCumplenFormato = ordenes.filter(~ordenes["Fecha_de_pedido"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('Fecha_de_pedido', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_de_pedido')))
ordenes = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()

Descripciones


In [None]:
detallesOrdenes.where(length(col("Descripcion")) <= 10).show()

Imputar valor maximo de cantidad

In [None]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[1]

In [None]:
detallesOrdenes = detallesOrdenes.replace( 10000000, 360, 'Cantidad')

In [None]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[0]

In [None]:
detallesOrdenes.show(5)

In [None]:
ordenes.show(5)

Se unen los dos dataframes en un nuevo dataframe, se verifica que no haya duplicados y si los hay se eliminan. Se crea un nuevo dataframe que va a tener toda la información del hecho orden transformada y lista para continuar el proceso de cargue.

In [None]:
ordenes_tmp =ordenes
ordenes_tmp = ordenes_tmp.join(detallesOrdenes, how = 'inner', on = 'ID_de_pedido')
ordenes_tmp = ordenes_tmp.withColumn('Valor_total',col('Precio_unitario')*col('Cantidad'))
ordenes_tmp = ordenes_tmp.withColumn('Impuestos',col('Valor_total')*col('Tasa_de_impuesto'))
ordenes_tmp = ordenes_tmp.selectExpr('ID_de_pedido as ID_de_pedido_T','ID_Producto','Fecha_de_pedido','ID_de_cliente','ID_de_vendedor','ID_Tipo_Paquete','Cantidad','Valor_total', 'Impuestos')

print((ordenes_tmp.count(),ordenes_tmp.distinct().count()))

ordenes_tmp = ordenes_tmp.drop_duplicates()
ordenes_tmp.show(5)

In [None]:
guardar_db(dest_db_connection_string, ordenes_tmp,'Estudiante_i_XXXXXX.Hecho_Orden_Tmp', db_user, db_psswd)

Cree la tabla de Fecha según el material compartido y adicione el left join al crear la tabla de ordenes para que quede completa

In [None]:
# El idPedido representa la dimensión degenerada Pedido
# Si hay campos nulos en ordenes_tmp al hacer join por el left outer join no se perderan y se utiliza como comodín un id=0 que debe existir en todas las dimensiones.
# Ese comodín representa el registro sin Dato.
# Debe adicionarle a todas las tablas el registro con identificador 0, como se muestra para la tabla de clientes
# Recuerde que falta incluir el join con la tabla de Fecha para que la tabla quede completa.

ordenes = ordenes_tmp.alias('o').join(clientes.alias('cl'), ordenes_tmp.ID_de_cliente == clientes.ID_Cliente_T,'left')\
                    .join(ciudades.alias('ciu'), clientes.ID_CiudadEntrega == ciudades.ID_ciudad_T,'left') \
                    .join(empleados.alias('e'), ordenes_tmp.ID_de_vendedor == empleados.ID_Empleado_T,'left') \
                    .join(paquetes.alias('p'), ordenes_tmp.ID_Tipo_Paquete == paquetes.ID_TipoPaquete_T,'left') \
                    .join(productos.alias('pr'), (ordenes_tmp.ID_Producto == productos.ID_Producto_T) ,'left') \
                    .select([col('o.ID_de_pedido_T'),col('cl.ID_Cliente_DWH'),col('ciu.ID_Ciudad_DWH'),
                             col('e.ID_Empleado_DWH'),col('pr.ID_Producto_DWH'),col('p.ID_TipoPaquete_DWH'),
                             col('o.Cantidad'),col('o.Valor_total'),col('o.Impuestos')]) \
                    .fillna({'ID_Cliente_DWH': 0, 'ID_Ciudad_DWH': 0, 'ID_Empleado_DWH': 0, 'ID_Producto_DWH': 0,
                             'ID_TipoPaquete_DWH': 0})
ordenes.show(5)

In [None]:
ordenes = ordenes.select('ID_de_pedido_T','ID_Ciudad_DWH','ID_Cliente_DWH','ID_Empleado_DWH','ID_Producto_DWH',
                         'ID_TipoPaquete_DWH','Cantidad','Impuestos','Valor_total') \
                    .withColumnRenamed('Valor_total','Total')
ordenes.show(5)

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [None]:
guardar_db(dest_db_connection_string, ordenes,'Estudiante_i_XXXXXX.Hecho_Orden', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

## 4. Cierre
Completado este tutorial, ya sabe cómo realizar ETL básicos en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

El Capítulo 2 del libro <i>Learn PySpark : Build Python-based Machine Learning and Deep Learning Models, New York: Apress. 2019</i> de Pramod Singh contiene muchos ejemplos útiles, puede encontrarlo en la biblioteca virtual de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.