# Tarea ELT 

Estudiante: Wendy Galvis

## Enunciado

Implementación del proceso ETL para las dimensiones Proveedor, TipoTransaccion, Fecha y para la tabla de hechos movimientos. Las dimensiones Producto y Cliente son iguales a las del hecho Orden, este caso se conoce como dimensiones compartidas. Usted debe concentrarse en las dimensiones Proveedor, TipoTransaccion, Fecha y la tabla de hechos que no existen en la bodega de datos actualmente.

WWImporters le comparte el modelo multidimensional que ha decido utilizar. Este modelo representa los movimientos(transacciones) que se hacen sobre el inventario de WWImporters. En particular, se observa que se tiene información de los tipos de transacciones, realizadas por un proveedor, relacionado con un cliente y un producto específico en una fecha determinada. En el modelo se dejan explícitos los dos tipos de identificadores que se crean a nivel de la bodega. el propio de la Bodega, (con el sufijo DWH) y el que viene de la fuente (con el sufijo T).

Sobre los resultados del entendimiento de datos, Wide World Importers les comenta lo siguiente:

* Cada fila representa una transacción o movimiento de productos en el inventario
* Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.
* Sobre la regla de negocio dada para la actividad de entendimiento de datos “La cantidad máxima de productos movidos es 50 millones por transaccion”, el negocio revisó y encontró que efectivamente gracias a los avances en su operación, ya puede mover más que la cantidad de 50 millones por transacción, por lo cual elimina esa regla de negocio.
* La falta de datos antes del 2014 es un error de extracción de datos. Los nuevos datos incluyen este año.
* Nuestro análisis concluye que la información que se ha duplicado totalmente no es útil. Por favor no tenerlos en cuenta.
* “El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD”: En cuanto a formatos de fechas estamos de acuerdo con que los estandarizemos y el formato sea el especificado en la regla de negocio.
* Existen proveedores que tienen 2 filas una con un nombre y otra con el mismo nombre mas un “Inc” o “Ltd”. Unimos estos a un solo proveedor dado que se trató de un error de digitación.
* El código postal igual para todos nuestros proveedores es un error que también fue corregido.
* Cantidades negativas significan salidas de productos del inventario
* El negocio indica que las tablas de categoriasProveedores y TiposTransaccion fueron analizadas previamente, por su grupo de consultores.

## Configuración e importe de paquetes

In [1]:
    db_user = 'Estudiante_8_202413'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.120:8080/WWImportersTransactional'
dest_db_connection_string = 'jdbc:mysql://157.253.236.120:8080/Estudiante_8_202413'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime
from pyspark.sql.functions import to_date, date_format
from pyspark.sql.types import StringType

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### Dimensión Proveedor
Empezamos con la dimensión <i>Proveedor</i>. Su fuente de datos es una combinación de las tablas transaccionales  <i>proveedores</i>, <i>CategoriasProovedor</i> y <i>Personas</i>. 

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, tambien renombramos los atributos de acuerdo con lo establecido en el modelo multidimensional.

In [5]:
#EXTRACCION
sql_proveedores = '''(SELECT DISTINCT ProveedorID AS ID_Proveedor_T, NombreProveedor AS Nombre,  CategoriaProveedorID, PersonaContactoPrincipalID AS ID_persona, DiasPago AS Dias_pago, CodigoPostal AS Codigo_postal FROM WWImportersTransactional.proveedores) AS Temp_proveedores'''
proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)

sql_categorias = '''(SELECT DISTINCT CategoriaProveedorID, CategoriaProveedor AS Categoria FROM WWImportersTransactional.CategoriasProveedores) AS Temp_categorias'''
categorias = obtener_dataframe_de_bd(source_db_connection_string, sql_categorias, db_user, db_psswd)

sql_personas = '''(SELECT DISTINCT ID_persona, NombreCompleto AS Contacto_principal FROM WWImportersTransactional.Personas) AS Temp_personas'''
personas = obtener_dataframe_de_bd(source_db_connection_string, sql_personas, db_user, db_psswd)

print(proveedores.columns, categorias.columns, personas.columns)

['ID_Proveedor_T', 'Nombre', 'CategoriaProveedorID', 'ID_persona', 'Dias_pago', 'Codigo_postal'] ['CategoriaProveedorID', 'Categoria'] ['ID_persona', 'Contacto_principal']


#### Transformación

T1. Eliminar de los registros de la columna Nombre los sufijos Inc y Ltd

In [6]:
proveedores = proveedores.withColumn('Nombre', regexp_replace('Nombre', r',\s*(Ltd\.|Inc\.)', ''))
proveedores.show()

+--------------+--------------------+--------------------+----------+---------+-------------+
|ID_Proveedor_T|              Nombre|CategoriaProveedorID|ID_persona|Dias_pago|Codigo_postal|
+--------------+--------------------+--------------------+----------+---------+-------------+
|             4|            Fabrikam|                   4|        27|       30|        40351|
|             5|Graphic Design In...|                   2|        29|       14|        64847|
|             7|             Litware|                   5|        33|       30|        95245|
|             9|      Nod Publishers|                   2|        37|        7|        27906|
|            10|Northwind Electri...|                   3|        39|       30|         7860|
|            12|   The Phone Company|                   2|        43|       30|        56732|
|            13|      Woodgrove Bank|                   7|        45|        7|        94101|
|             2|             Contoso|                   2|  

T2. Multiplicar los valores negativos de la columna "DiasPago" por -1

In [7]:
proveedores = proveedores.withColumn('Dias_pago', 
                                     when(col('Dias_pago') < 0, col('Dias_pago') * -1)
                                     .otherwise(col('Dias_pago')))
proveedores.show()

+--------------+--------------------+--------------------+----------+---------+-------------+
|ID_Proveedor_T|              Nombre|CategoriaProveedorID|ID_persona|Dias_pago|Codigo_postal|
+--------------+--------------------+--------------------+----------+---------+-------------+
|             4|            Fabrikam|                   4|        27|       30|        40351|
|             5|Graphic Design In...|                   2|        29|       14|        64847|
|             7|             Litware|                   5|        33|       30|        95245|
|             9|      Nod Publishers|                   2|        37|        7|        27906|
|            10|Northwind Electri...|                   3|        39|       30|         7860|
|            12|   The Phone Company|                   2|        43|       30|        56732|
|            13|      Woodgrove Bank|                   7|        45|        7|        94101|
|             2|             Contoso|                   2|  

T3. Seleccionar Categoria de la tabla CategoriasProovedor realizando la unión por medio de la variable  CategoriaProveedorID

T4. Seleccionar Contacto_principal de la tabla Personas realizando la unión por medio de la variable  ID_persona

T5. Generar códigos propios de la bódega de datos

In [8]:
proveedores = proveedores.join(categorias, how = 'left', on = 'CategoriaProveedorID')
proveedores = proveedores.join(personas, how = 'left', on = 'ID_persona')
proveedores = proveedores.coalesce(1).withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
proveedores = proveedores.select('ID_Proveedor_DWH', 'ID_Proveedor_T', 'Nombre', 'Categoria', 'Contacto_principal', 'Dias_pago', 'Codigo_postal')
proveedores.show(5)

+----------------+--------------+-------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|             Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+-------------------+--------------------+------------------+---------+-------------+
|               1|             6|Humongous Insurance|servicios de seguros|Madelaine  Cartier|       14|        37770|
|               2|             4|           Fabrikam|                ropa|       Bill Lawson|       30|        40351|
|               3|            11|      Trey Research|servicios de mark...|      Donald Jones|        7|        57543|
|               4|            12|  The Phone Company| productos novedosos|           Hai Dam|       30|        56732|
|               5|             9|     Nod Publishers| productos novedosos|      Marcos Costa|        7|        27906|
+----------------+--------------+-------------------+---

In [9]:
proveedores.show()

+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|               1|             6| Humongous Insurance|servicios de seguros|Madelaine  Cartier|       14|        37770|
|               2|             4|            Fabrikam|                ropa|       Bill Lawson|       30|        40351|
|               3|            11|       Trey Research|servicios de mark...|      Donald Jones|        7|        57543|
|               4|            12|   The Phone Company| productos novedosos|           Hai Dam|       30|        56732|
|               5|             9|      Nod Publishers| productos novedosos|      Marcos Costa|        7|        27906|
|               6|             8|  Lucerne Publi

#### Carga

In [10]:
guardar_db(dest_db_connection_string, proveedores,'Estudiante_8_202413.Proveedor_ETL', db_user, db_psswd)

### Dimensión Tipo_transaccion
Seguimos con la dimensión <i>Tipo_transaccion</i>. Su fuente de datos es la tabla transaccional  <i>TiposTransaccion</i>.

#### Extracción

In [11]:
#EXTRACCION

sql_transaccion = '''(SELECT DISTINCT TipoTransaccionID AS Id_Tipo_transaccion_T, TipoTransaccionNombre AS Tipo FROM WWImportersTransactional.TiposTransaccion) AS Temp_tipo_transaccion'''
transaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_transaccion, db_user, db_psswd)

print(transaccion.columns)

['Id_Tipo_transaccion_T', 'Tipo']


#### Transformación

T1. Generar códigos propios de la bódega de datos

In [12]:
transaccion = transaccion.coalesce(1).withColumn('Id_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
transaccion.show()

+---------------------+--------------------+-----------------------+
|Id_Tipo_transaccion_T|                Tipo|Id_Tipo_transaccion_DWH|
+---------------------+--------------------+-----------------------+
|                    2|Customer Credit Note|                      1|
|                    3|Customer Payment ...|                      2|
|                    4|     Customer Refund|                      3|
|                    5|    Supplier Invoice|                      4|
|                    6|Supplier Credit Note|                      5|
|                    7|Supplier Payment ...|                      6|
|                    8|     Supplier Refund|                      7|
|                    9|      Stock Transfer|                      8|
|                   10|         Stock Issue|                      9|
|                   11|       Stock Receipt|                     10|
|                   12|Stock Adjustment ...|                     11|
|                   13|     Custom

#### Carga

In [13]:
guardar_db(dest_db_connection_string, transaccion,'Estudiante_8_202413.TipoTransacción_ETL', db_user, db_psswd)

### Dimensión Fecha
Seguimos con la dimensión <i>Fecha</i>. Su fuente de datos es la tabla transaccional  <i>movimientos</i>.

#### Extracción

In [14]:
sql_fecha = '''(SELECT DISTINCT FechaTransaccion AS ID_Fecha FROM WWImportersTransactional.movimientos) AS Temp_fecha'''
fecha = obtener_dataframe_de_bd(source_db_connection_string, sql_fecha, db_user, db_psswd)

print(fecha.columns)

['ID_Fecha']


#### Transformación

T1.  Convertir la fecha a un número que la representa en formato AAAAMMDD

In [15]:
fecha.show()

+-----------+
|   ID_Fecha|
+-----------+
|Jan 20,2014|
|Jan 28,2014|
|Feb 01,2014|
|Mar 25,2014|
|May 01,2014|
|May 02,2014|
|May 10,2014|
|May 26,2014|
|Jun 02,2014|
|Jul 08,2014|
|Jul 17,2014|
|Jul 24,2014|
|Aug 19,2014|
|Sep 16,2014|
|Sep 17,2014|
|Sep 23,2014|
|Oct 04,2014|
|Oct 21,2014|
|Oct 30,2014|
|Nov 07,2014|
+-----------+
only showing top 20 rows



In [16]:
# TRANSFORMACION
regex = "([0-2]\\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = fecha.filter(fecha["ID_Fecha"].rlike(regex))
noCumplenFormato = fecha.filter(~fecha["ID_Fecha"].rlike(regex))
noCumplenFormato = noCumplenFormato.withColumn('ID_Fecha', 
    f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y%m%d'), StringType())(col('ID_Fecha')))
fecha = noCumplenFormato.union(cumplenFormato)
fecha.show()

+--------+
|ID_Fecha|
+--------+
|20140120|
|20140128|
|20140201|
|20140325|
|20140501|
|20140502|
|20140510|
|20140526|
|20140602|
|20140708|
|20140717|
|20140724|
|20140819|
|20140916|
|20140917|
|20140923|
|20141004|
|20141021|
|20141030|
|20141107|
+--------+
only showing top 20 rows



T2. Crear Fecha llevando ID_Fecha al formato YYYY-MM-DD

In [17]:
fecha = fecha.withColumn('Fecha', to_date(col('ID_Fecha'), 'yyyyMMdd'))
fecha.show()

+--------+----------+
|ID_Fecha|     Fecha|
+--------+----------+
|20140120|2014-01-20|
|20140128|2014-01-28|
|20140201|2014-02-01|
|20140325|2014-03-25|
|20140501|2014-05-01|
|20140502|2014-05-02|
|20140510|2014-05-10|
|20140526|2014-05-26|
|20140602|2014-06-02|
|20140708|2014-07-08|
|20140717|2014-07-17|
|20140724|2014-07-24|
|20140819|2014-08-19|
|20140916|2014-09-16|
|20140917|2014-09-17|
|20140923|2014-09-23|
|20141004|2014-10-04|
|20141021|2014-10-21|
|20141030|2014-10-30|
|20141107|2014-11-07|
+--------+----------+
only showing top 20 rows



T3. Crear día, mes, año, número de día de la semana

In [18]:
from pyspark.sql.functions import dayofmonth, month, year, weekofyear

fecha = fecha.withColumn('Dia', dayofmonth('Fecha'))
fecha = fecha.withColumn('Mes', month('Fecha'))
fecha = fecha.withColumn('Anio', year('Fecha'))
fecha = fecha.withColumn('Numero_semana_ISO', weekofyear('Fecha'))

fecha.show()

+--------+----------+---+---+----+-----------------+
|ID_Fecha|     Fecha|Dia|Mes|Anio|Numero_semana_ISO|
+--------+----------+---+---+----+-----------------+
|20140120|2014-01-20| 20|  1|2014|                4|
|20140128|2014-01-28| 28|  1|2014|                5|
|20140201|2014-02-01|  1|  2|2014|                5|
|20140325|2014-03-25| 25|  3|2014|               13|
|20140501|2014-05-01|  1|  5|2014|               18|
|20140502|2014-05-02|  2|  5|2014|               18|
|20140510|2014-05-10| 10|  5|2014|               19|
|20140526|2014-05-26| 26|  5|2014|               22|
|20140602|2014-06-02|  2|  6|2014|               23|
|20140708|2014-07-08|  8|  7|2014|               28|
|20140717|2014-07-17| 17|  7|2014|               29|
|20140724|2014-07-24| 24|  7|2014|               30|
|20140819|2014-08-19| 19|  8|2014|               34|
|20140916|2014-09-16| 16|  9|2014|               38|
|20140917|2014-09-17| 17|  9|2014|               38|
|20140923|2014-09-23| 23|  9|2014|            

#### Carga

In [19]:
guardar_db(dest_db_connection_string, transaccion,'Estudiante_8_202413.Fecha_ETL', db_user, db_psswd)

### Dimensión: Cliente
Su fuente de datos es la combinación entre las tablas transaccionales Categorias de cliente, Grupos de compra y Clientes. Se usa el mismo código implementado en el tutorial

#### Extracción

In [20]:
sql_categoriasCliente = '''(SELECT DISTINCT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT DISTINCT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT DISTINCT ID_Cliente as ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

#### Transformación

In [21]:
# TRANSFORMACION 
# EL SUPUESTO QUE SE TIENE ES QUE TODOS LOS CLIENTES TIENEN TODOS SUS DATOS DE CATEGORIA Y GRUPO Y NO SE ESTÁN PERDIENDO CLIENTES AL REALIZAR ESTE JOIN
clientes = clientes.join(gruposCompra, how = 'left', on = 'ID_GrupoCompra')
clientes = clientes.alias('cl').join(categoriasCliente.alias('ct'), how = 'left', on = 'ID_Categoria') \
.select([col('cl.ID_Cliente_T'),col('cl.Nombre'),col('ct.NombreCategoria'),col('cl.NombreGrupoCompra') \
        ,col('cl.ClienteFactura'),col('cl.ID_CiudadEntrega'),col('cl.LimiteCredito'),col('cl.FechaAperturaCuenta'),col('cl.DiasPago')])
clientes = clientes.coalesce(1).withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
clientes = clientes.select('ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
                          'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago')

clientes = clientes.fillna({'NombreCategoria':'Missing','NombreGrupoCompra':'Missing'})
clientes.show(5)

+--------------+------------+-----------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|           Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+-----------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         803|       Bala Dixit|   Novelty Shop|          Missing|           803|           33799|         2000|2013-01-01 00:00:00|       7|
|             2|         805|     Ratan Poddar|   Novelty Shop|          Missing|           805|           10194|         3300|2013-01-01 00:00:00|       7|
|             3|         806|           Shi Tu|   Novelty Shop|          Missing|           806|           31685|         3000|2013-01-01 00:00:00|       7|
|             4|         811|    Surendra Sahu|   Novelty 

In [22]:
# Crea el registro para el id = 0
clientes_0 = [('0','','Missing','Missing','Missing','0','0','','','')]
columns = ['ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
            'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago']
cliente_0 = spark.createDataFrame(data=clientes_0,schema=columns)
cliente_0.show()

+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T| Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |Missing|        Missing|          Missing|             0|               0|             |                   |        |
+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+



In [23]:
clientes = clientes.union(cliente_0)
clientes.show(5)

+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|            Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         807|     Gunnar Lohmus|     Gift Store|          Missing|           807|           35810|         3100|2013-01-01 00:00:00|       7|
|             2|         816|        Harsha Huq|     Gift Store|          Missing|           816|            8892|         2400|2013-01-01 00:00:00|       7|
|             3|         823|Francisca Laureano|     Gift Store|          Missing|           823|           17142|         3500|2013-01-01 00:00:00|       7|
|             4|         825|    Jayanta Thakur|    

In [24]:
# Se ordena por el identificador DWH
clientes = clientes.withColumn('ID_Cliente_DWH',col('ID_Cliente_DWH').cast('int')).orderBy(col('ID_Cliente_DWH'))
clientes.show(5)

+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|            Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |           Missing|        Missing|          Missing|             0|               0|             |                   |        |
|             1|         807|     Gunnar Lohmus|     Gift Store|          Missing|           807|           35810|         3100|2013-01-01 00:00:00|       7|
|             2|         816|        Harsha Huq|     Gift Store|          Missing|           816|            8892|         2400|2013-01-01 00:00:00|       7|
|             3|         823|Francisca Laureano|    

#### Carga

In [25]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Estudiante_8_202413.Cliente_ETL', db_user, db_psswd)

### Dimensión: Producto
Su fuente de datos es la combinación entre las tablas transaccionales Productos y Colores. Se usa el mismo código implementado en el tutorial

#### Extracción

In [26]:
sql_productos = '''(SELECT DISTINCT ID_Producto as ID_Producto_T, ID_Color, NombreProducto, Marca, Necesita_refrigeracion, Dias_tiempo_entrega, Impuesto, PrecioUnitario, PrecioRecomendado FROM WWImportersTransactional.Producto) AS Temp_productos'''
sql_colores = '''(SELECT DISTINCT ID_Color, Color FROM WWImportersTransactional.Colores) AS Temp_colores'''

productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
colores = obtener_dataframe_de_bd(source_db_connection_string, sql_colores, db_user, db_psswd)

#### Transformación

In [27]:
# TRANSFORMACION
productos = productos.join(colores, how = 'left', on = 'ID_Color').fillna({'Color': 'Missing'})
productos = productos.coalesce(1).withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productos = productos.select('ID_Producto_DWH','ID_Producto_T','NombreProducto','Marca','Color','Necesita_refrigeracion','Dias_tiempo_entrega','PrecioRecomendado','Impuesto','PrecioUnitario')
productos.show(5)

+---------------+-------------+--------------------+-----+-------+----------------------+-------------------+-----------------+--------+--------------+
|ID_Producto_DWH|ID_Producto_T|      NombreProducto|Marca|  Color|Necesita_refrigeracion|Dias_tiempo_entrega|PrecioRecomendado|Impuesto|PrecioUnitario|
+---------------+-------------+--------------------+-----+-------+----------------------+-------------------+-----------------+--------+--------------+
|              1|            1|USB missile launc...| null|Missing|                     0|                 14|               37|      15|            25|
|              2|            4|USB food flash dr...| null|Missing|                     0|                 14|               48|      15|            32|
|              3|            5|USB food flash dr...| null|Missing|                     0|                 14|               48|      15|            32|
|              4|            6|USB food flash dr...| null|Missing|                     0

#### Carga

In [28]:
# CARGUE
guardar_db(dest_db_connection_string, productos,'Estudiante_8_202413.Producto_ETL', db_user, db_psswd)

### Hecho movimiento
Su fuente de datos es la la tabla transaccional movimientos

#### Extracción

In [29]:
sql_movimientos = '''(SELECT DISTINCT FechaTransaccion AS ID_Fecha, ProductoID AS ID_Producto_T, ProveedorID AS ID_Proveedor_T, ClienteID AS ID_Cliente_T, TipoTransaccionID AS Id_Tipo_transaccion_T, Cantidad FROM WWImportersTransactional.movimientos) AS Temp_movimientos'''
movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)

print(movimientos.columns)

['ID_Fecha', 'ID_Producto_T', 'ID_Proveedor_T', 'ID_Cliente_T', 'Id_Tipo_transaccion_T', 'Cantidad']


#### Transformación

Verificamos si existen registros duplicados

In [30]:
num_total_registros = movimientos.count()
num_registros_unicos = movimientos.dropDuplicates().count()

if num_total_registros == num_registros_unicos:
    print("No hay registros duplicados.")
else:
    print(f"Hay {num_total_registros - num_registros_unicos} registros duplicados.")

No hay registros duplicados.


En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en el modelado.

Convertir la fecha a un número que la representa en formato AAAAMMDD. En el modelado se establece que ID_Fecha es un número que corresponde a YYYYMMDD

In [31]:
regex = "([0-2]\\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = movimientos.filter(movimientos["ID_Fecha"].rlike(regex))
noCumplenFormato = movimientos.filter(~movimientos["ID_Fecha"].rlike(regex))
noCumplenFormato = noCumplenFormato.withColumn('ID_Fecha', 
    f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y%m%d'), StringType())(col('ID_Fecha')))
movimientos = noCumplenFormato.union(cumplenFormato)
movimientos.show()

+--------+-------------+--------------+------------+---------------------+--------+
|ID_Fecha|ID_Producto_T|ID_Proveedor_T|ID_Cliente_T|Id_Tipo_transaccion_T|Cantidad|
+--------+-------------+--------------+------------+---------------------+--------+
|20140120|          108|          null|       185.0|                   10|   -10.0|
|20140128|          162|           4.0|         0.0|                   11|    10.0|
|20140128|          216|          null|       474.0|                   10|   -10.0|
|20140128|           22|           7.0|         0.0|                   11|    10.0|
|20140128|           25|           7.0|         0.0|                   11|    10.0|
|20140201|           14|          null|       444.0|                   10|   -10.0|
|20140201|           75|           7.0|         0.0|                   11|    10.0|
|20140325|           20|          null|       802.0|                   10|   -10.0|
|20140325|           65|           4.0|         0.0|                   11|  

T3. Seleccionar ID_Proveedor_DWH de la tabla de proveedores, Id_Tipo_transaccion_DWH de la tabla TipoTransacción, ID_Cliente_DWH de la tabla Cliente y ID_Producto_DWH de la tabla Producto

In [32]:
movimientos = movimientos.alias('m').join(clientes.alias('cl'), movimientos.ID_Cliente_T == clientes.ID_Cliente_T,'left')\
                    .join(productos.alias('pr'), movimientos.ID_Producto_T == productos.ID_Producto_T,'left') \
                    .join(proveedores.alias('p'), movimientos.ID_Proveedor_T == proveedores.ID_Proveedor_T,'left') \
                    .join(transaccion.alias('tr'), movimientos.Id_Tipo_transaccion_T == transaccion.Id_Tipo_transaccion_T,'left') \
                    .join(fecha.alias('f'), (movimientos.ID_Fecha == fecha.ID_Fecha) ,'left') \
                    .select([col('m.ID_Fecha'),col('cl.ID_Cliente_DWH'),col('pr.ID_Producto_DWH'),
                             col('p.ID_Proveedor_DWH'),col('tr.Id_Tipo_transaccion_DWH'),col('m.Cantidad')]) \
                   .fillna({'ID_Cliente_DWH': 0, 'ID_Producto_DWH': 0, 'ID_Proveedor_DWH': 0, 'Id_Tipo_transaccion_DWH': 0,
                             'ID_Fecha': 0})
movimientos.show()

+--------+--------------+---------------+----------------+-----------------------+--------+
|ID_Fecha|ID_Cliente_DWH|ID_Producto_DWH|ID_Proveedor_DWH|Id_Tipo_transaccion_DWH|Cantidad|
+--------+--------------+---------------+----------------+-----------------------+--------+
|20140325|           618|            194|               0|                      9|   -10.0|
|20140325|           234|             18|               0|                      9|   -10.0|
|20140325|           339|            144|               0|                      9|   -10.0|
|20140201|           350|             12|               0|                      9|   -10.0|
|20140325|           326|            170|               0|                      9|   -10.0|
|20140120|           290|            154|               0|                      9|   -10.0|
|20140325|           499|            169|               0|                      9|   -10.0|
|20140128|           380|             88|               0|                      

#### Carga

In [33]:
# CARGUE
guardar_db(dest_db_connection_string, movimientos,'Estudiante_8_202413.Movimientos_ETL', db_user, db_psswd)