# Tutorial: creación de ETLs con PySpark

## 2. Configuración Inicial

In [1]:
# Configuración servidor base de datos transaccional
db_user = 'Estudiante_111_202315'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'
dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_111_202315'



db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'
# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

PATH='./'

In [49]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.types import TimestampType, DateType
import re
from pyspark.sql.functions import *


In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [6]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### ETL->DIM TIPOTRANSACTION

Fuente de datos:
Tabla transactional TiposTransaction 
Empezamos con el bloque 1: la dimensión <i>Empleado</i>, su fuente de datos viene de la tabla transaccional <i>Personas</i>. En la sentencia SQL filtramos usando WHERE para seleccionar solo las personas que sean vendedores y recuperamos únicamente los atributos que queremos, de acuerdo con  modelo dimensional. Recuerde que también puede usar el **.select()** de pyspark si no conoce los atributos de las tablas transaccionales. Sin embargo, es más eficiente aplicar el filtro en la consulta, ya que no trae a memoria más información de la necesaria.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. Note que aquí se pueden renombrar los atributos con la estructura <i>nombreActual AS nuevoNombre</i>. De la tabla de personas, En este paso, solo nos interesan los empleados, por lo cual se hace un filtro por medio del WHERE, buscando las personas cuyo atributo EsVendedor sea igual a 1.

In [62]:
sql_tipostrans = '''(SELECT DISTINCT TipoTransaccionID AS ID_Tipo_transaccion_T , TipoTransaccionNombre AS Tipo FROM WWImportersTransactional.TiposTransaccion) AS Temp_empleados'''
tipos_trans_df = obtener_dataframe_de_bd(source_db_connection_string, sql_tipostrans, db_user, db_psswd)
tipos_trans_df.show(10)
tipos_trans_df.printSchema()

print(f'Numero de registros {tipos_trans_df.count()}');

+---------------------+--------------------+
|ID_Tipo_transaccion_T|                Tipo|
+---------------------+--------------------+
|                    2|Customer Credit Note|
|                    3|Customer Payment ...|
|                    4|     Customer Refund|
|                    5|    Supplier Invoice|
|                    6|Supplier Credit Note|
|                    7|Supplier Payment ...|
|                    8|     Supplier Refund|
|                    9|      Stock Transfer|
|                   10|         Stock Issue|
|                   11|       Stock Receipt|
+---------------------+--------------------+
only showing top 10 rows

root
 |-- ID_Tipo_transaccion_T: integer (nullable = true)
 |-- Tipo: string (nullable = true)

Numero de registros 12


In [59]:

print(f'Numero de registros {tipos_trans_df.count()}');

Numero de registros 12


#### Transformación
Recuerde que, puede hacer uso de selectExpr, filter, where entre otras de PySpark para modificar los datos cargados. Por ejemplo, el siguiente código utiliza <i>selectExpr</i> para renombrar la columna ID_Empleado por ID_Empleado_T, esta es la convención que vamos a utilizar: "_T" para indicar que el ID es el que estaba en la base de datos transaccional y "_DWH" para indicar que son ID's propios de la bodega. Usamos withColumn y monotonicallu_increasing_id para crear un ID acumulativo para cada registro en el dataframe

In [55]:
# TRANSFORMACION
#Ninguna

#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [63]:
# CARGUE
guardar_db(dest_db_connection_string, tipos_trans_df,'Estudiante_111_202315.TipoTransaccion', db_user, db_psswd)


Verifique los resultados usando MySQL Workbench

In [68]:
test = obtener_dataframe_de_bd(dest_db_connection_string, "Estudiante_111_202315.TipoTransaccion", db_user, db_psswd)
test.show(5)
print(f'Numero de registros {test.count()}');

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                     30|                    2|Customer Credit Note|
|                     31|                    3|Customer Payment ...|
|                     32|                    4|     Customer Refund|
|                     33|                    5|    Supplier Invoice|
|                     34|                    6|Supplier Credit Note|
+-----------------------+---------------------+--------------------+
only showing top 5 rows

Numero de registros 12


### ETL->DIM PROVEEDOR
Empezamos el bloque 2: dimensión ciudad. Su fuente de datos es una combinación de las tablas transaccionales <i>paises, provinciasEstados y ciudades</i>

#### Extracción

In [102]:
#EXTRACCION
#sql_paises = '''(SELECT DISTINCT ID_Pais, Nombre, Continente, Region, Subregion FROM WWImportersTransactional.Paises) AS Temp_paises'''
#sql_provincias_estados = '''(SELECT DISTINCT ID_EstadosProvincias AS ID_EstadoProvincia, NombreEstadoProvincia, TerritorioVentas, ID_Pais FROM WWImportersTransactional.EstadosProvincias) AS Temp_estados_provincias'''
#sql_ciudades = '''(SELECT DISTINCT ID_ciudad as ID_ciudad_T, NombreCiudad, ID_EstadoProvincia, Poblacion FROM WWImportersTransactional.Ciudades) AS Temp_ciudades'''

#paises = obtener_dataframe_de_bd(source_db_connection_string, sql_paises, db_user, db_psswd)
#provincias_estados = obtener_dataframe_de_bd(source_db_connection_string, sql_provincias_estados, db_user, db_psswd)
#ciudades = obtener_dataframe_de_bd(source_db_connection_string, sql_ciudades, db_user, db_psswd)


#print(ciudades.columns, paises.columns, provincias_estados.columns)

sql_provedores=  '''(SELECT DISTINCT ProveedorID as ID_Proveedor_T, NombreProveedor AS Nombre,CategoriaProveedorID, PersonaContactoPrincipalID, DiasPago AS Dias_pago, CodigoPostal AS Codigo_postal FROM WWImportersTransactional.proveedoresCopia) AS Temp_proveedores'''
sql_cat_prov=  '''(SELECT DISTINCT CategoriaProveedorID, CategoriaProveedor AS Categoria FROM WWImportersTransactional.CategoriasProveedores) AS Temp_cat_proveedores'''
sql_personas=  '''(SELECT DISTINCT ID_persona, NombreCompleto AS Contacto_principal FROM WWImportersTransactional.Personas) AS Temp_personas'''
proveedores_df = obtener_dataframe_de_bd(source_db_connection_string, sql_provedores, db_user, db_psswd)
cat_proveedores_df = obtener_dataframe_de_bd(source_db_connection_string, sql_cat_prov, db_user, db_psswd)
personas_df = obtener_dataframe_de_bd(source_db_connection_string, sql_personas, db_user, db_psswd)
proveedores_df.printSchema()
cat_proveedores_df.printSchema()
personas_df.printSchema()

root
 |-- ID_Proveedor_T: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- CategoriaProveedorID: integer (nullable = true)
 |-- PersonaContactoPrincipalID: integer (nullable = true)
 |-- Dias_pago: integer (nullable = true)
 |-- Codigo_postal: integer (nullable = true)

root
 |-- CategoriaProveedorID: integer (nullable = true)
 |-- Categoria: string (nullable = true)

root
 |-- ID_persona: integer (nullable = true)
 |-- Contacto_principal: string (nullable = true)



#### Transformación

In [103]:
# TRANSFORMACION
#ciudades = ciudades.join(provincias_estados, how = 'left', on = 'ID_EstadoProvincia')
#ciudades = ciudades.join(paises, how = 'left', on = 'ID_Pais')
#ciudades = ciudades.coalesce(1).withColumn('ID_Ciudad_DWH', f.monotonically_increasing_id() + 1)
#ciudades = ciudades.select('ID_Ciudad_DWH','ID_ciudad_T','NombreCiudad','Continente','Nombre','Poblacion',
#                          'Region','TerritorioVentas','NombreEstadoProvincia','Subregion') \
#                    .withColumnRenamed('Nombre','Pais')
#ciudades.show(5)
proveedores_df= proveedores_df.join(cat_proveedores_df, how='left', on ='CategoriaProveedorID')
proveedores_df= proveedores_df.join(personas_df, how='left', on = proveedores_df.PersonaContactoPrincipalID == personas_df.ID_persona)
proveedores_df= proveedores_df.select('ID_Proveedor_T','Nombre', 'Categoria','Contacto_principal','Dias_pago','Codigo_postal')
proveedores_df.printSchema()

proveedores_df.head(3)

print(proveedores_df.count())

root
 |-- ID_Proveedor_T: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Categoria: string (nullable = true)
 |-- Contacto_principal: string (nullable = true)
 |-- Dias_pago: integer (nullable = true)
 |-- Codigo_postal: integer (nullable = true)

19


In [51]:
proveedores_df.filter(proveedores_df['ID_Proveedor_T']==7).show()

+--------------+------------+---------+------------------+---------+-------------+
|ID_Proveedor_T|      Nombre|Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+--------------+------------+---------+------------------+---------+-------------+
|             7|Litware Inc.| embalaje|     Elias Myllari|       30|        95245|
+--------------+------------+---------+------------------+---------+-------------+



In [52]:
proveedores_df.filter(proveedores_df['ID_Proveedor_T']==23).show()

+--------------+----------------+---------+------------------+---------+-------------+
|ID_Proveedor_T|          Nombre|Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+--------------+----------------+---------+------------------+---------+-------------+
|            23|Litware Inc. Ltd| embalaje|     Elias Myllari|       30|        95245|
+--------------+----------------+---------+------------------+---------+-------------+



#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [104]:
# CARGUE
guardar_db(dest_db_connection_string, proveedores_df,'Estudiante_111_202315.Proveedor', db_user, db_psswd)

In [105]:
#guardar_db(dest_db_connection_string, proveedores_df,'Estudiante_111_202315.Proveedor', db_user, db_psswd)
test = obtener_dataframe_de_bd(dest_db_connection_string, "Estudiante_111_202315.Proveedor", db_user, db_psswd)
test.show(5)
print(f'Numero de registros {test.count()}');

+----------------+--------------+-------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|             Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+-------------------+--------------------+------------------+---------+-------------+
|              25|             6|Humongous Insurance|servicios de seguros|Madelaine  Cartier|      -14|        37770|
|              26|             4|      Fabrikam Inc.|                ropa|       Bill Lawson|       30|        40351|
|              27|            20|  Fabrikam Inc. Ltd|                ropa|       Bill Lawson|       30|        40351|
|              28|            11|      Trey Research|servicios de mark...|      Donald Jones|       -7|        57543|
|              29|            26|  Trey Research Inc|servicios de mark...|      Donald Jones|        7|        57543|
+----------------+--------------+-------------------+---

Verifique los resultados usando MySQL Workbench

### CLIENTE
Bloque 3: dimensión paquete. Su fuente de datos es la tabla transaccional <i>Paquetes</i>

#### Extracción

In [158]:
#EXTRACCION
#sql_paquetes = '''(SELECT DISTINCT ID_TipoPaquete AS ID_TipoPaquete_T, TipoPaquete AS Nombre FROM WWImportersTransactional.Paquetes) AS Temp_Paquetes'''
#paquetes = obtener_dataframe_de_bd(source_db_connection_string, sql_paquetes, db_user, db_psswd


sql_categoriasCliente = '''(SELECT DISTINCT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT DISTINCT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT DISTINCT ID_Cliente as ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega AS ID_CiudadEntrega_DWH, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente_df = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra_df = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes_df = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)
categoriasCliente_df.printSchema()
gruposCompra_df.printSchema()
clientes_df.printSchema()

root
 |-- ID_Categoria: integer (nullable = true)
 |-- NombreCategoria: string (nullable = true)

root
 |-- ID_GrupoCompra: integer (nullable = true)
 |-- NombreGrupoCompra: string (nullable = true)

root
 |-- ID_Cliente_T: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- ClienteFactura: integer (nullable = true)
 |-- ID_Categoria: integer (nullable = true)
 |-- ID_GrupoCompra: integer (nullable = true)
 |-- ID_CiudadEntrega_DWH: integer (nullable = true)
 |-- LimiteCredito: decimal(10,0) (nullable = true)
 |-- FechaAperturaCuenta: timestamp (nullable = true)
 |-- DiasPago: integer (nullable = true)



#### Transformación

In [159]:
# TRANSFORMACION
#paquetes = paquetes.coalesce(1).withColumn('ID_TipoPaquete_DWH', f.monotonically_increasing_id() + 1)
#paquetes = paquetes.select('ID_TipoPaquete_DWH','ID_TipoPaquete_T','Nombre')
#paquetes.show(5)

clientes_df = clientes_df.join(gruposCompra_df, how = 'left', on = 'ID_GrupoCompra')
clientes_df = clientes_df.join(categoriasCliente_df, how = 'left', on = 'ID_Categoria') 

clientes_df = clientes_df.select('ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
                          'ID_CiudadEntrega_DWH','LimiteCredito','FechaAperturaCuenta','DiasPago')

clientes_df = clientes_df.fillna({'NombreCategoria':'Missing','NombreGrupoCompra':'Missing'})
clientes_df.show(5)
clientes_df.printSchema()

print(clientes_df.count())

+------------+--------------------+---------------+-----------------+--------------+--------------------+-------------+-------------------+--------+
|ID_Cliente_T|              Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega_DWH|LimiteCredito|FechaAperturaCuenta|DiasPago|
+------------+--------------------+---------------+-----------------+--------------+--------------------+-------------+-------------------+--------+
|           1|Tailspin Toys (He...|   Novelty Shop|    Tailspin Toys|             1|               19586|         null|2013-01-01 00:00:00|       7|
|           2|Tailspin Toys (Sy...|   Novelty Shop|    Tailspin Toys|             1|               33475|         null|2013-01-01 00:00:00|       7|
|           3|Tailspin Toys (Pe...|   Novelty Shop|    Tailspin Toys|             1|               26483|         null|2013-01-01 00:00:00|       7|
|           4|Tailspin Toys (Me...|   Novelty Shop|    Tailspin Toys|             1|               21692| 

In [162]:

from pyspark.sql.types import StructType, StructField, TimestampType,IntegerType,StringType,DecimalType
# Crea el registro para el id = 0

schema = StructType([
    StructField("ID_Cliente_T", IntegerType(), nullable=True),
    StructField("Nombre", StringType(), nullable=True),
    StructField("NombreCategoria", StringType(), nullable=True),
    StructField("NombreGrupoCompra", StringType(), nullable=True),
    StructField("ClienteFactura", IntegerType(), nullable=True),
    StructField("ID_CiudadEntrega_DWH", IntegerType(), nullable=True),
    StructField("LimiteCredito", DecimalType(10, 0), nullable=True),
    StructField("FechaAperturaCuenta", TimestampType(), nullable=True),
    StructField("DiasPago", IntegerType(), nullable=True)
])
data = [(0, 'Missing', 'Missing','Missing',None,None, None, None, None)]
cliente_0_df = spark.createDataFrame(data, schema)
clientes_df = cliente_0_df.union(clientes_df)
clientes_df.head(5)

[Row(ID_Cliente_T=0, Nombre='Missing', NombreCategoria='Missing', NombreGrupoCompra='Missing', ClienteFactura=None, ID_CiudadEntrega_DWH=None, LimiteCredito=None, FechaAperturaCuenta=None, DiasPago=None),
 Row(ID_Cliente_T=1, Nombre='Tailspin Toys (Head Office)', NombreCategoria='Novelty Shop', NombreGrupoCompra='Tailspin Toys', ClienteFactura=1, ID_CiudadEntrega_DWH=19586, LimiteCredito=None, FechaAperturaCuenta=datetime.datetime(2013, 1, 1, 0, 0), DiasPago=7),
 Row(ID_Cliente_T=2, Nombre='Tailspin Toys (Sylvanite, MT)', NombreCategoria='Novelty Shop', NombreGrupoCompra='Tailspin Toys', ClienteFactura=1, ID_CiudadEntrega_DWH=33475, LimiteCredito=None, FechaAperturaCuenta=datetime.datetime(2013, 1, 1, 0, 0), DiasPago=7),
 Row(ID_Cliente_T=3, Nombre='Tailspin Toys (Peeples Valley, AZ)', NombreCategoria='Novelty Shop', NombreGrupoCompra='Tailspin Toys', ClienteFactura=1, ID_CiudadEntrega_DWH=26483, LimiteCredito=None, FechaAperturaCuenta=datetime.datetime(2013, 1, 1, 0, 0), DiasPago=7),


In [137]:
#clientes_df.count()

664

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [163]:
# CARGUE
guardar_db(dest_db_connection_string, clientes_df,'Estudiante_111_202315.Cliente', db_user, db_psswd)


In [164]:
#guardar_db(dest_db_connection_string, proveedores_df,'Estudiante_111_202315.Proveedor', db_user, db_psswd)
test = obtener_dataframe_de_bd(dest_db_connection_string, "Estudiante_111_202315.Cliente", db_user, db_psswd)
test.show(5)
print(f'Numero de registros {test.count()}');

+--------------+------------+--------------------+--------------+--------------------+-------------+-------------------+--------+-----------------+---------------+
|ID_Cliente_DWH|ID_Cliente_T|              Nombre|ClienteFactura|ID_CiudadEntrega_DWH|LimiteCredito|FechaAperturaCuenta|DiasPago|NombreGrupoCompra|NombreCategoria|
+--------------+------------+--------------------+--------------+--------------------+-------------+-------------------+--------+-----------------+---------------+
|          1995|         804|Aleksandrs Riekstins|           804|               18069|      2200.00|         2013-01-01|       7|          Missing| Computer Store|
|          1996|         808|      Jackson Kolios|           808|               28221|      1800.00|         2013-01-01|       7|          Missing| Computer Store|
|          1997|         809|       Madhu Dwivedi|           809|               26105|      1700.00|         2013-01-01|       7|          Missing| Computer Store|
|          1998|

Verifique los resultados usando MySQL Workbench

### Productos
Bloque 4: dimensión producto, su fuente de datos es la combinación entre las tablas transaccionales Productos y Colores

#### Extracción

In [119]:
sql_productos = '''(SELECT DISTINCT ID_Producto as ID_Producto_T, ID_Color, NombreProducto AS Nombre, Marca, Necesita_refrigeracion AS Necesitarefrigeracion, Dias_tiempo_entrega,PrecioRecomendado AS Precio_minorista_recomendado, Impuesto, PrecioUnitario AS Precio_unitario FROM WWImportersTransactional.Producto) AS Temp_productos'''
sql_colores = '''(SELECT DISTINCT ID_Color, Color FROM WWImportersTransactional.Colores) AS Temp_colores'''

productos_df = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
colores_df = obtener_dataframe_de_bd(source_db_connection_string, sql_colores, db_user, db_psswd)
productos_df.printSchema()
colores_df.printSchema()


root
 |-- ID_Producto_T: integer (nullable = true)
 |-- ID_Color: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Marca: string (nullable = true)
 |-- Necesitarefrigeracion: integer (nullable = true)
 |-- Dias_tiempo_entrega: integer (nullable = true)
 |-- Precio_minorista_recomendado: decimal(10,0) (nullable = true)
 |-- Impuesto: decimal(10,0) (nullable = true)
 |-- Precio_unitario: decimal(10,0) (nullable = true)

root
 |-- ID_Color: integer (nullable = true)
 |-- Color: string (nullable = true)



#### Transformación

In [123]:
# TRANSFORMACION

#productos_df = productos_df.withColumn("cantidad_por_salida", col("cantidad_por_salida").cast("int"))
productos_df = productos_df.withColumn("cantidad_por_salida", lit(None).cast("int"))
productos_df.printSchema()
productos_df = productos_df.join(colores_df, how = 'left', on = 'ID_Color').fillna({'Color': 'Missing'})
#productos = productos.coalesce(1).withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productos_df = productos_df.select('ID_Producto_T','Nombre','Marca','Color','Necesitarefrigeracion','Dias_tiempo_entrega', 'cantidad_por_salida','Precio_minorista_recomendado','Impuesto','Precio_unitario')
productos_df.show(5)

root
 |-- ID_Producto_T: integer (nullable = true)
 |-- ID_Color: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Marca: string (nullable = true)
 |-- Necesitarefrigeracion: integer (nullable = true)
 |-- Dias_tiempo_entrega: integer (nullable = true)
 |-- Precio_minorista_recomendado: decimal(10,0) (nullable = true)
 |-- Impuesto: decimal(10,0) (nullable = true)
 |-- Precio_unitario: decimal(10,0) (nullable = true)
 |-- cantidad_por_salida: integer (nullable = true)

+-------------+--------------------+-----+-------+---------------------+-------------------+-------------------+----------------------------+--------+---------------+
|ID_Producto_T|              Nombre|Marca|  Color|Necesitarefrigeracion|Dias_tiempo_entrega|cantidad_por_salida|Precio_minorista_recomendado|Impuesto|Precio_unitario|
+-------------+--------------------+-----+-------+---------------------+-------------------+-------------------+----------------------------+--------+---------------+
|    

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [125]:
# CARGUE
guardar_db(dest_db_connection_string, productos_df,'Estudiante_111_202315.Producto', db_user, db_psswd)

In [126]:
#guardar_db(dest_db_connection_string, productos_df,'Estudiante_111_202315.Producto', db_user, db_psswd)
test = obtener_dataframe_de_bd(dest_db_connection_string, "Estudiante_111_202315.Producto", db_user, db_psswd)
test.show(5)
print(f'Numero de registros {test.count()}');

+---------------+-------------+--------------------+-----+-------+---------------------+-------------------+-------------------+----------------------------+--------+---------------+
|ID_Producto_DWH|ID_Producto_T|              Nombre|Marca|  Color|Necesitarefrigeracion|Dias_tiempo_entrega|cantidad_por_salida|Precio_minorista_recomendado|Impuesto|Precio_unitario|
+---------------+-------------+--------------------+-----+-------+---------------------+-------------------+-------------------+----------------------------+--------+---------------+
|            233|            1|USB missile launc...| null|Missing|                false|                 14|               null|                       37.00|   15.00|             25|
|            234|            4|USB food flash dr...| null|Missing|                false|                 14|               null|                       48.00|   15.00|             32|
|            235|            5|USB food flash dr...| null|Missing|                fal

Verifique los resultados usando MySQL Workbench

### CREACIOON TABLA DIMENSIOPNAL FECHA
Bloque 5: dimensión cliente. Su fuente de datos es la combinación entre las tablas transaccionales Categorias de cliente, Grupos de compra y Clientes

#### Extracción

In [91]:
# = '''(SELECT DISTINCT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
#sql_gruposCompra = '''(SELECT DISTINCT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
#sql_clientes = '''(SELECT DISTINCT ID_Cliente as ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

#categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
#gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
#clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

tabla_movimiento = 'WWImportersTransactional.movimientosCopia'
movimientos_df = obtener_dataframe_de_bd(db_connection_string, tabla_movimiento, db_user, db_psswd)
movimientos_df.printSchema()
movimientos_df.count()


root
 |-- TransaccionProductoID: integer (nullable = true)
 |-- ProductoID: integer (nullable = true)
 |-- TipoTransaccionID: integer (nullable = true)
 |-- ClienteID: double (nullable = true)
 |-- InvoiceID: double (nullable = true)
 |-- ProveedorID: string (nullable = true)
 |-- OrdenDeCompraID: string (nullable = true)
 |-- FechaTransaccion: string (nullable = true)
 |-- Cantidad: double (nullable = true)



204292

#### Transformación

In [92]:
# TRANSFORMACION 
# EL SUPUESTO QUE SE TIENE ES QUE TODOS LOS CLIENTES TIENEN TODOS SUS DATOS DE CATEGORIA Y GRUPO Y NO SE ESTÁN PERDIENDO CLIENTES AL REALIZAR ESTE JOIN
#clientes = clientes.join(gruposCompra, how = 'left', on = 'ID_GrupoCompra')
#clientes = clientes.alias('cl').join(categoriasCliente.alias('ct'), how = 'left', on = 'ID_Categoria') \
#.select([col('cl.ID_Cliente_T'),col('cl.Nombre'),col('ct.NombreCategoria'),col('cl.NombreGrupoCompra') \
#        ,col('cl.ClienteFactura'),col('cl.ID_CiudadEntrega'),col('cl.LimiteCredito'),col('cl.FechaAperturaCuenta'),col('cl.DiasPago')])
#clientes = clientes.coalesce(1).withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
#clientes = clientes.select('ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
#                          'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago')

#clientes = clientes.fillna({'NombreCategoria':'Missing','NombreGrupoCompra':'Missing'})
#clientes.show(5)

regex = "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
cumpleformato_df = movimientos_df.filter(movimientos_df["FechaTransaccion"].rlike(regex))
print(cumpleformato_df.count())

def estandarizar_fecha(fecha):
    regex= "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
    match = re.match(regex, fecha)
    if match:
        nueva_fecha= fecha[0:19]
        return datetime.strptime(nueva_fecha, "%Y-%m-%d %H:%M:%S")
    else:
        regex = "^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s[0-3]?[0-9],[0-9]{4}$"
        match = re.match(regex, fecha)
        if match:
            return datetime.strptime(fecha, "%b %d,%Y")
        else:
            return None;

func = udf(estandarizar_fecha, DateType())
movimientos_df=movimientos_df.withColumn('fecha_estandar',func(col('FechaTransaccion')))
movimientos_df.printSchema()
movimientos_df.count()


140038
root
 |-- TransaccionProductoID: integer (nullable = true)
 |-- ProductoID: integer (nullable = true)
 |-- TipoTransaccionID: integer (nullable = true)
 |-- ClienteID: double (nullable = true)
 |-- InvoiceID: double (nullable = true)
 |-- ProveedorID: string (nullable = true)
 |-- OrdenDeCompraID: string (nullable = true)
 |-- FechaTransaccion: string (nullable = true)
 |-- Cantidad: double (nullable = true)
 |-- fecha_estandar: date (nullable = true)



204292

In [93]:
#nulos = movimientos_df.filter(movimientos_df["fecha_estandar"].isNull())
#nulos.count()
movimientos_df.head(5)
#nulos = movimientos_df.filter(movimientos_df["fecha_estandar"].isNull())
#nulos.count()


[Row(TransaccionProductoID=118903, ProductoID=217, TipoTransaccionID=10, ClienteID=476.0, InvoiceID=24904.0, ProveedorID='', OrdenDeCompraID='', FechaTransaccion='Apr 25,2014', Cantidad=-40.0, fecha_estandar=datetime.date(2014, 4, 25)),
 Row(TransaccionProductoID=286890, ProductoID=135, TipoTransaccionID=10, ClienteID=33.0, InvoiceID=60117.0, ProveedorID='', OrdenDeCompraID='', FechaTransaccion='Dec 10,2015', Cantidad=-7.0, fecha_estandar=datetime.date(2015, 12, 10)),
 Row(TransaccionProductoID=285233, ProductoID=111, TipoTransaccionID=10, ClienteID=180.0, InvoiceID=59768.0, ProveedorID='', OrdenDeCompraID='', FechaTransaccion='Dec 04,2015', Cantidad=-2.0, fecha_estandar=datetime.date(2015, 12, 4)),
 Row(TransaccionProductoID=290145, ProductoID=213, TipoTransaccionID=10, ClienteID=33.0, InvoiceID=60795.0, ProveedorID='', OrdenDeCompraID='', FechaTransaccion='Dec 23,2015', Cantidad=-3.0, fecha_estandar=datetime.date(2015, 12, 23)),
 Row(TransaccionProductoID=247492, ProductoID=90, TipoT

In [94]:
#fechas_df =movimientos_df.select(col('fecha_estandar')).distinct().orderBy(col('fecha_estandar'))
#fechas_df.show(5)
#fechas_df =movimientos_df.selectExpr('fecha_estandar as Fecha', "day(fecha_estandar) as Dia", "month(fecha_estandar) as Mes","year(fecha_estandar) as Anyo","weekofyear(fecha_estandar) as Numero_Semana_ISO" ).distinct().orderBy(col('fecha_estandar'))
#fechas_df =movimientos_df.selectExpr('fecha_estandar as Fecha', "day(fecha_estandar) as Dia", "month(fecha_estandar) as Mes","year(fecha_estandar) as Anyo","weekofyear(fecha_estandar) as Numero_Semana_ISO" ).distinct().orderBy(col('fecha_estandar'))
#fechas_df.show(5)
fechas_df =movimientos_df.selectExpr('fecha_estandar as Fecha').distinct()
fechas_df = fechas_df.selectExpr('Fecha', "day(Fecha) as Dia", "month(Fecha) as Mes","year(Fecha) as Anyo","weekofyear(Fecha) as Numero_Semana_ISO" )
fechas_df.printSchema()
fechas_df.head(2)
#

root
 |-- Fecha: date (nullable = true)
 |-- Dia: integer (nullable = true)
 |-- Mes: integer (nullable = true)
 |-- Anyo: integer (nullable = true)
 |-- Numero_Semana_ISO: integer (nullable = true)



[Row(Fecha=datetime.date(2014, 11, 12), Dia=12, Mes=11, Anyo=2014, Numero_Semana_ISO=46),
 Row(Fecha=datetime.date(2016, 3, 1), Dia=1, Mes=3, Anyo=2016, Numero_Semana_ISO=9)]

In [96]:
# Crea el registro para el id = 0
#clientes_0 = [('0','','Missing','Missing','Missing','0','0','','','')]
#columns = ['ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
#            'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago']
#cliente_0 = spark.createDataFrame(data=clientes_0,schema=columns)
#cliente_0.show()

#fechas_df = fechas_df.withColumn("Dia", day(col("Fecha")))
from pyspark.sql.functions import col, date_format, to_date

fechas_df = fechas_df.withColumn("ID_Fecha", date_format(col("Fecha"), "yyyyMMdd"))

#fechas_df=fechas_df.withColumn("ID_Fecha", concat(col("Anyo"), col("Mes"), col('Dia')).cast("int"))
fechas_df = fechas_df.select(col('ID_Fecha'),col('Fecha'), col('Dia'),col('Mes'), col('Anyo'), col('Numero_Semana_ISO'))
fechas_df.printSchema()
fechas_df.head(2)

root
 |-- ID_Fecha: string (nullable = true)
 |-- Fecha: date (nullable = true)
 |-- Dia: integer (nullable = true)
 |-- Mes: integer (nullable = true)
 |-- Anyo: integer (nullable = true)
 |-- Numero_Semana_ISO: integer (nullable = true)



[Row(ID_Fecha='20141112', Fecha=datetime.date(2014, 11, 12), Dia=12, Mes=11, Anyo=2014, Numero_Semana_ISO=46),
 Row(ID_Fecha='20160301', Fecha=datetime.date(2016, 3, 1), Dia=1, Mes=3, Anyo=2016, Numero_Semana_ISO=9)]

In [56]:
#clientes = clientes.union(cliente_0)
#clientes.show(5)

+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|              Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |             Missing|        Missing|          Missing|             0|               0|             |                   |        |
|             1|         807|       Gunnar Lohmus|     Gift Store|          Missing|           807|           35810|         3100|2013-01-01 00:00:00|       7|
|            10|         850|Daakshaayaani San...|     Gift Store|          Missing|           850|           32359|         1900|2013-01-01 00:00:00|       7|
|           100|        1029|   Veronika

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [99]:
# CARGUE

guardar_db(dest_db_connection_string, fechas_df,'Estudiante_111_202315.Fecha', db_user, db_psswd)


In [100]:
test = obtener_dataframe_de_bd(dest_db_connection_string, "Estudiante_111_202315.Fecha", db_user, db_psswd)
test.show(5)
print(f'Numero de registros {test.count()}');

+--------+----------+---+---+----+-----------------+
|ID_Fecha|     Fecha|Dia|Mes|Anyo|Numero_Semana_ISO|
+--------+----------+---+---+----+-----------------+
|20131231|2013-12-31| 31| 12|2013|                1|
|20140101|2014-01-01|  1|  1|2014|                1|
|20140102|2014-01-02|  2|  1|2014|                1|
|20140103|2014-01-03|  3|  1|2014|                1|
|20140104|2014-01-04|  4|  1|2014|                1|
+--------+----------+---+---+----+-----------------+
only showing top 5 rows

Numero de registros 758


Verifique los resultados usando MySQL Workbench

### BLOQUE 6
Bloque 6: Hecho orden. Su fuente de datos es la combinación entre las tablas transaccionales Ordenes y detalles de orden

#### Extracción

In [36]:
sql_ordenes = '''(SELECT DISTINCT * FROM WWImportersTransactional.Ordenes) AS Temp_ordenes'''
sql_detallesOrdenes = '''(SELECT DISTINCT * FROM WWImportersTransactional.DetallesOrdenes) AS Temp_detallesordenes'''
ordenes = obtener_dataframe_de_bd(source_db_connection_string, sql_ordenes, db_user, db_psswd)
detallesOrdenes = obtener_dataframe_de_bd(source_db_connection_string, sql_detallesOrdenes, db_user, db_psswd)

#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- La regla de negocio "La tasa de impuesto es de 10% o 15%" es correcta, pero habian errores en la tabla original, que fueron corregidos. 
- Para la segunda regla de negocio: "Son 73.595 órdenes detalladas en 231.412 lineas de detalle de órdenes realizadas desde 2013", si faltaban datos, los cuales fueron completados, y nos dicen que en cuanto a consistencia ellos revisaron las tablas e hicieron correcciones, pero que los duplicados completos de ordenes los eliminemos
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estan de acuerdo con que los estandarizemos y el formato sea el especificado en la regla
- Para las descripciones de productos que eran "a", se actualizaron a los valores reales. 
- Se pueden eliminar las columnas Comenarios, Instrucciones_de_entrega y comentarios_internos porque estan vacias. 
- A pesar de estar en un proceso de mejorar la calidad de los datos y mantener los nulos nos ayudaría a reflejar esa calidad, de la mano con el grupo de analitica de WWI se decide imputar por la media el valor extremo de la variable Cantidad
- Para las ordenes las columnas Seleccionado_por_ID_de_persona, ID_de_pedido_pendiente, Seleccion_completada_cuando, y para las columnas Seleccion_completada_cuando de la tabla detalles de ordenes, se decide mantener los valores vacíos, sin embargo para la variable Precio_unitario el negocio reviso y complemento los valores faltantes

Las tablas usadas en el tutorial de entendimiento de datos estaran disponibles para su revision con los siguientes nombres: OrdenesCopia y DetallesOrdenesCopia. 

Para este tutorial vamos a trabajar con unas tablas que dadas las conclusiones del tutorial de entendimiento, WWImporters revisó los datos originales, creo tablas y las llamo "Ordenes" y "DetallesOrdenes"

Se hace una verificación de los valores de la tasa de impuesto

In [37]:
detallesOrdenes.select("Tasa_de_impuesto").distinct().show()

+----------------+
|Tasa_de_impuesto|
+----------------+
|              10|
|              15|
+----------------+



Se hace una verificación del rango de fechas disponible en los datos

In [38]:
ordenes.agg({"Fecha_de_pedido": "min"}).show()

+--------------------+
|min(Fecha_de_pedido)|
+--------------------+
|          2013-01-01|
+--------------------+



Se elimina columnas Comenarios, Instrucciones_de_entrega y comentarios_internos

In [39]:
ordenes = ordenes.drop(*["Comentarios", "Instrucciones_de_entrega","comentarios_internos"])

Se eliminan duplicados totales de ordenes

In [40]:
print((ordenes.count(),ordenes.distinct().count()))

(93629, 93629)


In [41]:
ordenes = ordenes.drop_duplicates()

In [42]:
print((ordenes.count(),ordenes.distinct().count()))

(93629, 93629)


Se hace verificación de consistencia

In [43]:
#consistencia: revisar genially: definicion de consistencia
ids_ordenes = set([x.ID_de_pedido for x in ordenes.select('ID_de_pedido').collect()])
ids_detalles = set([x.ID_de_pedido for x in detallesOrdenes.select('ID_de_pedido').collect()])

len(ids_ordenes-ids_detalles), len(ids_detalles-ids_ordenes)

(0, 0)

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [44]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = ordenes.filter(ordenes["Fecha_de_pedido"].rlike(regex))
noCumplenFormato = ordenes.filter(~ordenes["Fecha_de_pedido"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('Fecha_de_pedido', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_de_pedido')))
ordenes = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()

20034 73595
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|    Jan 28,2014|               201

(20034, 93629)

Descripciones


In [45]:
detallesOrdenes.where(length(col("Descripcion")) <= 10).show()

+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+



Imputar valor maximo de cantidad

In [46]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[1]

Row(Cantidad=360)

In [47]:
detallesOrdenes = detallesOrdenes.replace( 10000000, 360, 'Cantidad')

In [48]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[0]

Row(Cantidad=360)

In [49]:
detallesOrdenes.show(5)

+----------------+------------+-----------+--------------------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|         Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+--------------------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|               1|          45|        164|32 mm Double side...|              7|      50|            112|              15|                   50|        2013-01-02 11:00:00|
|               2|           1|         67|Ride on toy sedan...|              7|      10|            230|              15|                   10|        2013-01-01 11:00:00|
|               3|           2|         50|Developer joke mu...|              7|       9|             13|              15|             

In [50]:
ordenes.show(5)

+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|     2014-01-28|               2014-01-29|    

Se unen los dos dataframes en un nuevo dataframe, se verifica que no haya duplicados y si los hay se eliminan. Se crea un nuevo dataframe que va a tener toda la información del hecho orden transformada y lista para continuar el proceso de cargue.

In [51]:
ordenes_tmp =ordenes
ordenes_tmp = ordenes_tmp.join(detallesOrdenes, how = 'inner', on = 'ID_de_pedido')
ordenes_tmp = ordenes_tmp.withColumn('Valor_total',col('Precio_unitario')*col('Cantidad'))
ordenes_tmp = ordenes_tmp.withColumn('Impuestos',col('Valor_total')*col('Tasa_de_impuesto'))
ordenes_tmp = ordenes_tmp.selectExpr('ID_de_pedido as ID_de_pedido_T','ID_Producto','Fecha_de_pedido','ID_de_cliente','ID_de_vendedor','ID_Tipo_Paquete','Cantidad','Valor_total', 'Impuestos')

print((ordenes_tmp.count(),ordenes_tmp.distinct().count()))

ordenes_tmp = ordenes_tmp.drop_duplicates()
ordenes_tmp.show(5)

(294314, 231412)
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+
|ID_de_pedido_T|ID_Producto|Fecha_de_pedido|ID_de_cliente|ID_de_vendedor|ID_Tipo_Paquete|Cantidad|Valor_total|Impuestos|
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+
|           148|        203|     2013-01-02|          812|            13|              7|      40|       1280|    19200|
|           463|         64|     2013-01-09|          555|             3|              7|       1|         30|      450|
|           463|         10|     2013-01-09|          555|             3|              7|       8|        256|     3840|
|           463|         16|     2013-01-09|          555|             3|              7|      10|        130|     1950|
|           463|         57|     2013-01-09|          555|             3|              7|       3|         39|      585|
+--------------

In [37]:
guardar_db(dest_db_connection_string, ordenes_tmp,'Estudiante_i_XXXXXX.Hecho_Orden_Tmp', db_user, db_psswd)

Cree la tabla de Fecha según el material compartido y adicione el left join al crear la tabla de ordenes para que quede completa

In [66]:
# El idPedido representa la dimensión degenerada Pedido
# Si hay campos nulos en ordenes_tmp al hacer join por el left outer join no se perderan y se utiliza como comodín un id=0 que debe existir en todas las dimensiones.
# Ese comodín representa el registro sin Dato.
# Debe adicionarle a todas las tablas el registro con identificador 0, como se muestra para la tabla de clientes
# Recuerde que falta incluir el join con la tabla de Fecha para que la tabla quede completa.

ordenes = ordenes_tmp.alias('o').join(clientes.alias('cl'), ordenes_tmp.ID_de_cliente == clientes.ID_Cliente_T,'left')\
                    .join(ciudades.alias('ciu'), clientes.ID_CiudadEntrega == ciudades.ID_ciudad_T,'left') \
                    .join(empleados.alias('e'), ordenes_tmp.ID_de_vendedor == empleados.ID_Empleado_T,'left') \
                    .join(paquetes.alias('p'), ordenes_tmp.ID_Tipo_Paquete == paquetes.ID_TipoPaquete_T,'left') \
                    .join(productos.alias('pr'), (ordenes_tmp.ID_Producto == productos.ID_Producto_T) ,'left') \
                    .select([col('o.ID_de_pedido_T'),col('cl.ID_Cliente_DWH'),col('ciu.ID_Ciudad_DWH'),
                             col('e.ID_Empleado_DWH'),col('pr.ID_Producto_DWH'),col('p.ID_TipoPaquete_DWH'),
                             col('o.Cantidad'),col('o.Valor_total'),col('o.Impuestos')]) \
                    .fillna({'ID_Cliente_DWH': 0, 'ID_Ciudad_DWH': 0, 'ID_Empleado_DWH': 0, 'ID_Producto_DWH': 0,
                             'ID_TipoPaquete_DWH': 0})
ordenes.show(5)

+--------------+--------------+-------------+---------------+---------------+------------------+--------+-----------+---------+
|ID_de_pedido_T|ID_Cliente_DWH|ID_Ciudad_DWH|ID_Empleado_DWH|ID_Producto_DWH|ID_TipoPaquete_DWH|Cantidad|Valor_total|Impuestos|
+--------------+--------------+-------------+---------------+---------------+------------------+--------+-----------+---------+
|           463|           461|        29242|              2|            187|                 7|       1|         30|      450|
|           463|           461|        29242|              2|              8|                 7|       8|        256|     3840|
|           463|           461|        29242|              2|            192|                 7|      10|        130|     1950|
|           463|           461|        29242|              2|            121|                 7|       3|         39|      585|
|           148|           559|        35925|              6|             75|                 7|      40

In [67]:
ordenes = ordenes.select('ID_de_pedido_T','ID_Ciudad_DWH','ID_Cliente_DWH','ID_Empleado_DWH','ID_Producto_DWH',
                         'ID_TipoPaquete_DWH','Cantidad','Impuestos','Valor_total') \
                    .withColumnRenamed('Valor_total','Total')
ordenes.show(5)

+--------------+-------------+--------------+---------------+---------------+------------------+--------+---------+-----+
|ID_de_pedido_T|ID_Ciudad_DWH|ID_Cliente_DWH|ID_Empleado_DWH|ID_Producto_DWH|ID_TipoPaquete_DWH|Cantidad|Impuestos|Total|
+--------------+-------------+--------------+---------------+---------------+------------------+--------+---------+-----+
|           463|        29242|           461|              2|            187|                 7|       1|      450|   30|
|           463|        29242|           461|              2|              8|                 7|       8|     3840|  256|
|           463|        29242|           461|              2|            192|                 7|      10|     1950|  130|
|           463|        29242|           461|              2|            121|                 7|       3|      585|   39|
|           471|         3075|           252|              6|             75|                 7|      50|    24000| 1600|
+--------------+--------

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [68]:
guardar_db(dest_db_connection_string, ordenes,'Estudiante_i_XXXXXX.Hecho_Orden', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

## 4. Cierre
Completado este tutorial, ya sabe cómo realizar ETL básicos en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

El Capítulo 2 del libro <i>Learn PySpark : Build Python-based Machine Learning and Deep Learning Models, New York: Apress. 2019</i> de Pramod Singh contiene muchos ejemplos útiles, puede encontrarlo en la biblioteca virtual de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.