# Solución tarea semana 4

## 1. Introducción	
    En esta tarea se dará solución para llevar a cabo los diferentes procesos de ETL para las diferentes dimensiones y la tabla de hechos movimiento, se partirá de un diseño que permite orientar la implementación y tener en cuenta las consideraciones del proceso ETL como los campos a tener en cuenta y las transformaciones sobre las reglas o necesidades de negocio que permitan obtener análisis de una forma más fiable y entendible.
	

## 2. Proceso de ETL.

En este proceso de ETL, se extraen los datos de los **movimientos** de una base de datos transaccional y se almacenan en otra base de datos que corresponde a la bodega de datos, siguiendo una aproximación ROLAP. A continuación, se presenta el modelo multidimensional que es el modelo conceptual que representa el proceso de registro de movimientos de prodcutos. Este modelo se utilizará para crear las tablas en la bodega de datos que representan el proceso de negocio y que serán cargadas como resultado del proceso ETL. 

Para cada dimensión (excepto la dimensión fecha), se tendrán dos llaves las cuales están relacionadas con la bodega de datos (terminadas en DWH) y con las llaves de la base de datos transaccional (terminadas en T).


![Modelo movimientos](./WWI_modelo_movimiento.png)<br>**NOTA**: La imagen corresponde a la que está ubicada en esta misma carpeta con el nombre WWI_modelo_movimiento.png

Para el proceso de ETL se realizó el siguiente diseño general. A partir de la información obtenida de la fase de entendimiento de datos, se pudieron identificar las fuentes a utilizar y las relaciones entre estas, también la organización ha dado respuesta a algunas inquietudes, lo que permite conocer la mejor forma de proceder para manipular los datos. En el proceso de ETL se plantean 3 secciones o bloques para cada dimensión y la tabla de hechos del modelo anterior, la dimensión fecha no se considera ya que se genera de forma independiente.
![ETL](./Diseno_general_ETL.png)<br>**NOTA**: La imagen corresponde a la que está ubicada en esta misma carpeta con el nombre Diseno_general_ETL.png

### Configuración inicial para la sesión y base de datos

In [8]:
# Configuración servidor base de datos transaccional
db_user = 'Estudiante_81_202413'
db_psswd = 'MISO_aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_81_202413'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [141]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace, to_timestamp
from datetime import datetime

In [13]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)
spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [14]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>Proveedor</i>, su fuente de datos viene de la tabla transaccional <i>proveedores</i>. Dado que la dimensión Proveedor requiere de la categoría y del contacto principal, se realiza un join entre las tablas transaccionales <i>CategoriasProveedores</i> y <i>Personas</i> 

#### Extracción
Se toman la información de las 3 tablas mencionadas anteriormente del modelo transaccional y se almacenan como dataframes.

In [447]:
sql_proveedores = '''(SELECT DISTINCT ProveedorID, NombreProveedor, PersonaContactoAlternoID as ID_persona, CategoriaProveedorID, DiasPago, CodigoPostal FROM WWImportersTransactional.proveedores) AS Temp_proveedores'''
sql_personas = '''(SELECT DISTINCT ID_persona, NombreCompleto FROM WWImportersTransactional.Personas) AS Temp_personas'''
sql_categorias = '''(SELECT DISTINCT CategoriaProveedorID, CategoriaProveedor FROM WWImportersTransactional.CategoriasProveedores) AS Temp_categorias'''

proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)
personas = obtener_dataframe_de_bd(source_db_connection_string, sql_personas, db_user, db_psswd)
categorias = obtener_dataframe_de_bd(source_db_connection_string, sql_categorias, db_user, db_psswd)
print(proveedores.columns, personas.columns, categorias.columns)

['ProveedorID', 'NombreProveedor', 'ID_persona', 'CategoriaProveedorID', 'DiasPago', 'CodigoPostal'] ['ID_persona', 'NombreCompleto'] ['CategoriaProveedorID', 'CategoriaProveedor']


#### Transformación
Como parte del proceso de transformación, se procede a realizar lo siguiente:
1. Consolidar la información de proveedores a través de left joins entre la tabla Personas y CategoriasProveedores con el objetivo de no perder información de proveedores, además, se cambian los nombres de las columnas para que aparezcan igual al modelo.  
2. Wide World Importers mencionó "**Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.**", esto da pie a analizar los días de pago de los proveedores y convertir los negativos a valores positivos.


In [448]:
# TRANSFORMACION
# Unión entre tablas y cambio de nombres a las columnas para que coincidan con el modelo
proveedores = proveedores.join(personas, how = 'left', on = 'ID_persona')
proveedores = proveedores.join(categorias, how = 'left', on = 'CategoriaProveedorID')
proveedores = proveedores.coalesce(1).withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
proveedores = proveedores.select('ID_Proveedor_DWH','ProveedorID','NombreProveedor','CategoriaProveedor','NombreCompleto',
                                 'DiasPago', 'CodigoPostal') \
                    .withColumnRenamed('ProveedorID','ID_Proveedor_T') \
                    .withColumnRenamed('NombreProveedor','Nombre') \
                    .withColumnRenamed('CategoriaProveedor','Categoria') \
                    .withColumnRenamed('NombreCompleto','Contacto_principal') \
                    .withColumnRenamed('DiasPago','Dias_pago') \
                    .withColumnRenamed('CodigoPostal','Codigo_postal')
# Se ponen todos los días de pago en positivo
proveedores = proveedores.withColumn('Dias_pago', f.abs('Dias_pago'))

proveedores.show(10)

+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|               1|            12|   The Phone Company| productos novedosos|        Thanh Dinh|       30|        56732|
|               2|             1| A Datum Corporation| productos novedosos|       Oliver Kivi|       14|        46077|
|               3|             9|      Nod Publishers| productos novedosos|  Matheus Oliveira|        7|        27906|
|               4|             2|       Contoso, Ltd.| productos novedosos|    Paulus Lippmaa|        7|        98253|
|               5|             5|Graphic Design In...| productos novedosos|       Donna Smith|       14|        64847|
|               6|             8|  Lucerne Publi

En la tabla movimientos_v2 existen bastantes campos de ProductoID con valor nulo, por esta razón, se procede a crear un registro en la dimensión proveedores con identificador cero que permita relacionar a los valores nulos.

In [449]:
# Crea el registro para el id = 0
proveedores_0 = [(0,'','Missing','Missing','Missing','0','0')]
columns = ['ID_Proveedor_DWH','ID_Proveedor_T','Nombre','Categoria','Contacto_principal','Dias_pago','Codigo_postal']
proveedores_0 = spark.createDataFrame(data=proveedores_0,schema=columns)
proveedores_0.show()

+----------------+--------------+-------+---------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T| Nombre|Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+-------+---------+------------------+---------+-------------+
|               0|              |Missing|  Missing|           Missing|        0|            0|
+----------------+--------------+-------+---------+------------------+---------+-------------+



In [450]:
proveedores = proveedores.union(proveedores_0)
proveedores.orderBy(col("ID_Proveedor_DWH").asc()).show(5)

+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|               0|              |             Missing|             Missing|           Missing|        0|            0|
|               1|             3|Consolidated Mess...|servicios de mens...|       Helen Ahven|       30|        94101|
|               2|            10|Northwind Electri...|            juguetes|     Sara Karlsson|       30|         7860|
|               3|             7|       Litware, Inc.|            embalaje|        Vilma Niva|       30|        95245|
|               4|             6| Humongous Insurance|servicios de seguros|     Annette Talon|       14|        37770|
+----------------+--------------+---------------

Se puede apreciar que el registro con identificador para el DWH con valor cero quedó insertado.

#### Carga
Una vez realizado el proceso de carga y transformación, se guardan los resultados en la base de datos destino


In [451]:
# CARGUE
guardar_db(dest_db_connection_string, proveedores,'Estudiante_81_202413.ProveedorS4', db_user, db_psswd)

### Bloque 2
En el bloque 2 se determina la información para la dimensión **TipoTransaccion**, su fuente de datos corresponde a la tabla TiposTransaccion del modelo transaccional, la dimensión a analizar contendrá dos identificadores, uno para el identificador de la tabla transaccional y otro propio de la bodega de datos.

#### Extracción
Se procede a realizar la extracción de información desde la fuente de datos TiposTransaccion

In [454]:
# Extracción
sqlTiposTrasaccion = '''(SELECT DISTINCT TipoTransaccionID,TipoTransaccionNombre FROM WWImportersTransactional.TiposTransaccion) AS Temp_timpos_transaccion'''
tiposTransaccion = obtener_dataframe_de_bd(source_db_connection_string, sqlTiposTrasaccion, db_user, db_psswd)
tiposTransaccion.show()

+-----------------+---------------------+
|TipoTransaccionID|TipoTransaccionNombre|
+-----------------+---------------------+
|                2| Customer Credit Note|
|                3| Customer Payment ...|
|                4|      Customer Refund|
|                5|     Supplier Invoice|
|                6| Supplier Credit Note|
|                7| Supplier Payment ...|
|                8|      Supplier Refund|
|                9|       Stock Transfer|
|               10|          Stock Issue|
|               11|        Stock Receipt|
|               12| Stock Adjustment ...|
|               13|      Customer Contra|
+-----------------+---------------------+



#### Transformación
Se cambian los nombres de las columnas de la fuente de datos original por las indicadas en el modelo y se agrega la columna ID_Tipo_transaccion_DWH para que sea el identificador de la tabla.

In [455]:
# TRANSFORMACION
tiposTransaccion = tiposTransaccion.selectExpr('TipoTransaccionID as ID_Tipo_transaccion_T', 'TipoTransaccionNombre as Tipo')
tiposTransaccion = tiposTransaccion.coalesce(1).withColumn('ID_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
tiposTransaccion = tiposTransaccion.select('ID_Tipo_transaccion_DWH','ID_Tipo_transaccion_T','Tipo')
tiposTransaccion.show(5)

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
+-----------------------+---------------------+--------------------+
only showing top 5 rows



Actualmente los movimientos tienen asociado un tipo de transacción, sin embargo, dado que todos los campos en movimiento_v2 puenden ser nulos, se procede a agregar un nuevo registro con identificar 0 en la dimención de tipo de transacción con el fin de relacionar los nulos con este valor

In [456]:
# Crea el registro para el id = 0
tipoTransaccion_0 = [(0,'','Missing')]
columns = ['ID_Tipo_transaccion_DWH','ID_Tipo_transaccion_T','Tipo']
tipoTransaccion_0 = spark.createDataFrame(data=tipoTransaccion_0,schema=columns)
tipoTransaccion_0.show()

+-----------------------+---------------------+-------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|   Tipo|
+-----------------------+---------------------+-------+
|                      0|                     |Missing|
+-----------------------+---------------------+-------+



In [459]:
tiposTransaccion = tiposTransaccion.union(tipoTransaccion_0)
tiposTransaccion.orderBy(col("ID_Tipo_transaccion_DWH").asc()).show(5)

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      0|                     |             Missing|
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
+-----------------------+---------------------+--------------------+
only showing top 5 rows



#### Carga
Una vez realizado el proceso de extracción y transformación, se guardan los resultados en la base de datos destino

In [460]:
# CARGUE
guardar_db(dest_db_connection_string, tiposTransaccion,'Estudiante_81_202413.TipoTransaccionS4', db_user, db_psswd)

### BLOQUE 3
Bloque 3: Hecho movimiento, su fuente de datos corresponde a la tabla movimientos del modelo transaccional, en todo este bloque también se incluye la creación de la dimensión Fecha que es necesario para el hecho de movimientos

#### Extracción

In [417]:
sql_movimientos = '''(SELECT DISTINCT * FROM WWImportersTransactional.movimientos_v2) AS Temp_movimientos'''
movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)

#### Transformación

Sobre los resultados del entendimiento de datos, Wide World Importers les comenta lo siguiente:

* Cada fila representa una transacción o movimiento de productos en el inventario
* Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.
* Sobre la regla de negocio dada para la actividad de entendimiento de datos “La cantidad máxima de productos movidos es 50 millones por transaccion”, el negocio revisó y encontró que efectivamente gracias a los avances en su operación, ya puede mover más que la cantidad de 50 millones por transacción, por lo cual elimina esa regla de negocio.
* La falta de datos antes del 2014 es un error de extracción de datos. Los nuevos datos incluyen este año.
* Nuestro análisis concluye que la información que se ha duplicado totalmente no es útil. Por favor no tenerlos en cuenta.
* “El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD”: En cuanto a formatos de fechas estamos de acuerdo con que los estandarizemos y el formato sea el especificado en la regla de negocio.
* Existen proveedores que tienen 2 filas una con un nombre y otra con el mismo nombre mas un “Inc” o “Ltd”. Unimos estos a un solo proveedor dado que se trató de un error de digitación.
* El código postal igual para todos nuestros proveedores es un error que también fue corregido.
* Cantidades negativas significan salidas de productos del inventario
* El negocio indica que las tablas de categoriasProveedores y TiposTransaccion fueron analizadas previamente, por su grupo de consultores.

Se verifica que se incluyan datos antes del año 2014, para validar el comentario "La falta de datos antes del 2014 es un error de extracción de datos. Los nuevos datos incluyen este año." La fecha inicial es desde el primero de enero del 2013

In [499]:
movimientos.orderBy(col("FechaTransaccion").asc()).show(10)

+---------------------+----------+-----------------+---------+---------+-----------+---------------+--------------------+--------+
|TransaccionProductoID|ProductoID|TipoTransaccionID|ClienteID|InvoiceID|ProveedorID|OrdenDeCompraID|    FechaTransaccion|Cantidad|
+---------------------+----------+-----------------+---------+---------+-----------+---------------+--------------------+--------+
|                    1|        67|               10|    832.0|      1.0|       null|           null|2013-01-01 12:00:...|   -10.0|
|                   41|        73|               10|    473.0|     13.0|       null|           null|2013-01-01 12:00:...|    -8.0|
|                   40|       156|               10|    473.0|     13.0|       null|           null|2013-01-01 12:00:...|   -10.0|
|                   44|       163|               10|    870.0|     14.0|       null|           null|2013-01-01 12:00:...|   -10.0|
|                   48|       210|               10|    991.0|     15.0|       null

Se puede evidenciar que hay movimientos para antes del 2014

Se verifica la afirmación "El código postal igual para todos nuestros proveedores es un error que también fue corregido."

In [504]:
 proveedores.select("Codigo_Postal").distinct().show(5)

+-------------+
|Codigo_Postal|
+-------------+
|        57543|
|        40351|
|        64847|
|        37659|
|        37770|
+-------------+
only showing top 5 rows



Se puede apreciar que efectivamente existen códigos postales diferentes para los proveedores.

Se verifica que se hayan unificado los duplicados en los nombres de los proveedores, en caso de que el nombre del proveedor ya existiera en un registro y en otro también pero con “Inc” o “Ltd”.

In [418]:
# No se tienen en cuenta los nombres con Inc o Ltd
words = ["Inc", "Ltd"]
toMatch = "|".join(words)
proveedoresNamesWithoutIncLtd = proveedores.select("Nombre").where(~proveedores.Nombre.rlike(toMatch)).rdd.flatMap(lambda x: x).collect()

proveedoresNamesWithIncLtd = proveedores.select("Nombre").where(proveedores.Nombre.rlike(toMatch)).rdd.flatMap(lambda x: x).collect()

stringOfAllNames = ",".join(proveedoresNamesWithIncLtd)

duplicatesExists = False
for name in proveedoresNamesWithoutIncLtd:
    if name in stringOfAllNames:
        duplicatesExists = True
        print("Siguen existiendo proveedores con “Inc” o “Ltd” en el nombre duplicados")
if not duplicatesExists:
    print("Se unificaron satisfactoriamente los nombres duplicados de los proveedores")

Se unificaron satisfactoriamente los nombres duplicados de los proveedores


Como se puede apreciar, el negocio si realizó correctamente la unificación de los nombres repetidos de los proveedores

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio, el cual corresponde a YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD

In [419]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = movimientos.filter(movimientos["FechaTransaccion"].rlike(regex))
noCumplenFormato = movimientos.filter(~movimientos["FechaTransaccion"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('FechaTransaccion', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('FechaTransaccion')))
movimientos = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), movimientos.count()

64254 172413
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|TransaccionProductoID|ProductoID|TipoTransaccionID|ClienteID|InvoiceID|ProveedorID|OrdenDeCompraID|FechaTransaccion|Cantidad|
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|                94344|       108|               10|    185.0|  19763.0|       null|           null|     Jan 20,2014|   -10.0|
|                96548|       162|               11|      0.0|      0.0|        4.0|          228.0|     Jan 28,2014|    10.0|
|                96560|       216|               10|    474.0|  20224.0|       null|           null|     Jan 28,2014|   -10.0|
|                96568|        22|               11|      0.0|      0.0|        7.0|          193.0|     Jan 28,2014|    10.0|
|                96648|        25|               11|      0.0|      0.0|        7.0|          408.

(64254, 236667)

Se valida que efectivamente ya todas las fechas cumplan con el formato.

In [420]:
movimientos.filter(~movimientos["FechaTransaccion"].rlike(regex)).count()

0

Como se puede observar, de las fechas almacenadas, 0 fechas no cumplen con el formato, es decir que todas ya lo están cumpliendo.

#### Creación de la dimensión Fecha
Dado que la tabla Hecho_Movimiento tiene una relación con la dimensicón fecha, en esta sección se muestra el proceso para su creación. <br>
Como primera medida, se procede a seleccionar todas las fechas distintas almacenadas en el atabla movimientos, en la columna FechaTransaccion, además, se convierten a tipo fecha.
Como las fechas son de tipo string, estas almacenan el formato timestamp y el formato date, por lo tanto, si trato de hacer el casteo de una fecha string con formato timestamp a una fecha que quede con formato date, va a aparecer este valor en nulo, es por esto que se definen dos variables, **fechaTimeStamp** y **fechaDate**, una permite convertir las fechas en formato timestamp y la otra en formato date, luego se unen los resultados de estas dos variables y se eliminan los valores nulos. 

In [422]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
fecha = movimientos.withColumn("Date", f.to_timestamp(col("FechaTransaccion"), "yyyy-MM-dd HH:mm:ss")).select("Date").distinct()
fechaDate = movimientos.withColumn("Date", f.to_date(col("FechaTransaccion"), "yyyy-MM-dd")).select( "Date").distinct()
fecha = fechaDate.union(fechaTimeStamp)
fecha = fecha.filter(fecha.Date.isNotNull())
fecha.count()

3042

A continuación, se puede observar que el campo Date quedó de tipo Timestamp

In [423]:
fecha.schema

StructType(List(StructField(Date,TimestampType,true)))

Como ya todas las fechas están en tipo timestamp, se vuelve más fácil convertir todas las fechas en tipo date ya que en el modelo dimensional no se precisan las horas, minutos y segundos, además, es más fácil extraer las fechas que son diferentes 

In [424]:
fecha = fecha.withColumn("convertedToDate", f.to_date(col("Date"), "yyyy-MM-dd")).select("Date", "convertedToDate")

In [425]:
fecha.schema

StructType(List(StructField(Date,TimestampType,true),StructField(convertedToDate,DateType,true)))

Como se puede ver, ya existe la columna de tipo Timestamp y de tipo Date.

In [426]:
fecha.orderBy(col("Date").desc()).show(10)

+-------------------+---------------+
|               Date|convertedToDate|
+-------------------+---------------+
|2016-05-31 12:00:00|     2016-05-31|
|2016-05-31 07:00:00|     2016-05-31|
|2016-05-31 00:00:00|     2016-05-31|
|2016-05-30 12:00:00|     2016-05-30|
|2016-05-30 07:00:00|     2016-05-30|
|2016-05-30 00:00:00|     2016-05-30|
|2016-05-28 12:00:00|     2016-05-28|
|2016-05-28 00:00:00|     2016-05-28|
|2016-05-27 12:00:00|     2016-05-27|
|2016-05-27 07:00:00|     2016-05-27|
+-------------------+---------------+
only showing top 10 rows



Ahora se procede a dejar la columna tipo Date y solo tendrá valores distintos, esta columna se renombrará a Fecha

In [427]:
fecha = fecha.select("convertedToDate").distinct().withColumnRenamed("convertedToDate", "Fecha").orderBy(col("Fecha").asc())
fecha.show(10)

+----------+
|     Fecha|
+----------+
|2013-01-01|
|2013-01-02|
|2013-01-03|
|2013-01-04|
|2013-01-05|
|2013-01-07|
|2013-01-08|
|2013-01-09|
|2013-01-10|
|2013-01-11|
+----------+
only showing top 10 rows



A cada fecha se le agrega un identificador único, en el formato YYYYMMDD, además, se crean las columnas con sus valores para el día, mes, año y el número de semana ISO

In [428]:
fecha = fecha.select("Fecha", f.dayofmonth("Fecha").alias("Dia"), f.month("Fecha").alias("Mes"), f.year("Fecha").alias("Anio"), f.weekofyear("Fecha").alias("Numero_semana_ISO"))
fecha = fecha.coalesce(1).withColumn('ID_Fecha', 
                                     f.concat(
                                         col("Anio").cast("string"), 
                                         f.when(col("Mes") < 10, f.concat(f.lit("0"), col("Mes").cast("string"))).otherwise(col("Mes").cast("string")), 
                                         f.when(col("Dia") < 10, f.concat(f.lit("0"), col("Dia").cast("string"))).otherwise(col("Dia").cast("string"))
                                     ))
fecha.show(10)

+----------+---+---+----+-----------------+--------+
|     Fecha|Dia|Mes|Anio|Numero_semana_ISO|ID_Fecha|
+----------+---+---+----+-----------------+--------+
|2013-01-01|  1|  1|2013|                1|20130101|
|2013-01-02|  2|  1|2013|                1|20130102|
|2013-01-03|  3|  1|2013|                1|20130103|
|2013-01-04|  4|  1|2013|                1|20130104|
|2013-01-05|  5|  1|2013|                1|20130105|
|2013-01-07|  7|  1|2013|                2|20130107|
|2013-01-08|  8|  1|2013|                2|20130108|
|2013-01-09|  9|  1|2013|                2|20130109|
|2013-01-10| 10|  1|2013|                2|20130110|
|2013-01-11| 11|  1|2013|                2|20130111|
+----------+---+---+----+-----------------+--------+
only showing top 10 rows



Se muestra el esquma para la tabla fecha

In [429]:
fecha.schema

StructType(List(StructField(Fecha,DateType,true),StructField(Dia,IntegerType,true),StructField(Mes,IntegerType,true),StructField(Anio,IntegerType,true),StructField(Numero_semana_ISO,IntegerType,true),StructField(ID_Fecha,StringType,true)))

In [480]:
fecha.count()

1070

Existen 1070 fechas diferentes en formato YYYYMMDD

#### Carga
Una vez realizado el proceso de extracción y transformación, se guardan los resultados en la base de datos destino

In [481]:
# CARGUE
guardar_db(dest_db_connection_string, fecha,'Estudiante_81_202413.FechaS4', db_user, db_psswd)

#### Creación de la tabla de hechos

A continuación se realizan los joins entre las dimensiones para consolidar toda la información, también se muestra la manera de vincular el hecho con la dimensión fecha.

In [431]:
movimientosWithDateType = movimientos.withColumn("FechaTransaccion", f.to_date(col("FechaTransaccion"), "yyyy-MM-dd"))
movimientosWithDateType = movimientosWithDateType.filter(movimientosWithDateType.FechaTransaccion.isNotNull())
print(movimientosWithDateType.count(), movimientos.count())
print(movimientosWithDateType.select("FechaTransaccion").distinct().count())
movimientosWithDateType.show(10)

236667 236667
1070
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|TransaccionProductoID|ProductoID|TipoTransaccionID|ClienteID|InvoiceID|ProveedorID|OrdenDeCompraID|FechaTransaccion|Cantidad|
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|                94344|       108|               10|    185.0|  19763.0|       null|           null|      2014-01-20|   -10.0|
|                96548|       162|               11|      0.0|      0.0|        4.0|          228.0|      2014-01-28|    10.0|
|                96560|       216|               10|    474.0|  20224.0|       null|           null|      2014-01-28|   -10.0|
|                96568|        22|               11|      0.0|      0.0|        7.0|          193.0|      2014-01-28|    10.0|
|                96648|        25|               11|      0.0|      0.0|        7.0|        

Como se puede apreciar, se transforman todas las fechas en la tabla de movimientos al formato yyyy-mm-dd, se hace un conteo para que no hayan filas faltantes y se muestra la cantidad de fechas diferentes con este formato.
Esto se hace para poder relacionar correctamente la tabla de movimientos con la tabla de fecha y obtener su identificador.

In [471]:
sql_productos = '''(SELECT ID_Producto_DWH, ID_Producto_T FROM Estudiante_81_202413.ProductoS4) AS Temp_productos'''
sql_clientes = '''(SELECT ID_Cliente_DWH, ID_Cliente_T FROM Estudiante_81_202413.ClienteS4) AS Temp_productos'''

productosTmp = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
clientesTmp = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

In [505]:
movimientos_tmp = movimientosWithDateType
movimientosHecho = movimientos_tmp.alias('m').join(fecha.alias('f'), movimientos_tmp.FechaTransaccion == fecha.Fecha,'left')\
                    .join(productosTmp.alias('prod'), movimientos_tmp.ProductoID == productosTmp.ID_Producto_T,'left')\
                    .join(proveedores.alias('prov'), movimientos_tmp.ProveedorID == proveedores.ID_Proveedor_T,'left')\
                    .join(clientesTmp.alias('cli'), movimientos_tmp.ClienteID == clientesTmp.ID_Cliente_T,'left')\
                    .join(tiposTransaccion.alias('tt'), movimientos_tmp.TipoTransaccionID == tiposTransaccion.ID_Tipo_transaccion_T,'left')\
                    .select([col('f.ID_Fecha'), col('prod.ID_Producto_DWH'), col('prov.ID_Proveedor_DWH'), 
                             col('cli.ID_Cliente_DWH'), col('tt.ID_Tipo_transaccion_DWH'), col('m.Cantidad')])\
                    .fillna({'ID_Producto_DWH': 0, 'ID_Proveedor_DWH': 0, 'ID_Cliente_DWH': 0, 'ID_Tipo_transaccion_DWH': 0})
movimientosHecho.show(10)

+--------+---------------+----------------+--------------+-----------------------+--------+
|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_transaccion_DWH|Cantidad|
+--------+---------------+----------------+--------------+-----------------------+--------+
|20140128|             88|               0|           380|                      9|   -10.0|
|20140120|            154|               0|           290|                      9|   -10.0|
|20140325|            194|               0|           618|                      9|   -10.0|
|20140325|            146|               0|           393|                      9|   -10.0|
|20140201|             12|               0|           350|                      9|   -10.0|
|20140128|             46|               5|             0|                     10|    10.0|
|20140325|            123|               5|             0|                     10|    10.0|
|20140128|            105|               3|             0|                     1

#### Carga

In [506]:
guardar_db(dest_db_connection_string, movimientosHecho,'Estudiante_81_202413.Hecho_MovimientoS4', db_user, db_psswd)

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

## Consulta para ver algunos registros de la dimensión Fecha

In [483]:
sql_fechas = '''(SELECT * FROM Estudiante_81_202413.FechaS4) AS Temp_fechas'''
fechasS4 = obtener_dataframe_de_bd(source_db_connection_string, sql_fechas, db_user, db_psswd)
fechasS4.show(10)

+----------+---+---+----+-----------------+--------+
|     Fecha|Dia|Mes|Anio|Numero_semana_ISO|ID_Fecha|
+----------+---+---+----+-----------------+--------+
|2013-01-01|  1|  1|2013|                1|20130101|
|2013-01-02|  2|  1|2013|                1|20130102|
|2013-01-03|  3|  1|2013|                1|20130103|
|2013-01-04|  4|  1|2013|                1|20130104|
|2013-01-05|  5|  1|2013|                1|20130105|
|2013-01-07|  7|  1|2013|                2|20130107|
|2013-01-08|  8|  1|2013|                2|20130108|
|2013-01-09|  9|  1|2013|                2|20130109|
|2013-01-10| 10|  1|2013|                2|20130110|
|2013-01-11| 11|  1|2013|                2|20130111|
+----------+---+---+----+-----------------+--------+
only showing top 10 rows



In [484]:
fechasS4.count()

1070

## Consulta para ver algunos registros de la dimensión Producto
Vale la pena resaltar que está tabla se crea a partir de la interacción con el tutorial, ya que el enunciado no hace explícito que se deba hacer el proceso de ETL para esta dimensión.

In [485]:
sql_productosS4 = '''(SELECT * FROM Estudiante_81_202413.ProductoS4) AS Temp_productos'''
productosS4 = obtener_dataframe_de_bd(source_db_connection_string, sql_productosS4, db_user, db_psswd)
productosS4.show(10)

+---------------+-------------+--------------------+-----+-------+----------------------+-------------------+-----------------+--------+--------------+
|ID_Producto_DWH|ID_Producto_T|      NombreProducto|Marca|  Color|Necesita_refrigeracion|Dias_tiempo_entrega|PrecioRecomendado|Impuesto|PrecioUnitario|
+---------------+-------------+--------------------+-----+-------+----------------------+-------------------+-----------------+--------+--------------+
|              1|            1|USB missile launc...| null|Missing|                     0|                 14|               37|      15|            25|
|              2|            4|USB food flash dr...| null|Missing|                     0|                 14|               48|      15|            32|
|              3|            5|USB food flash dr...| null|Missing|                     0|                 14|               48|      15|            32|
|              4|            6|USB food flash dr...| null|Missing|                     0

In [486]:
productosS4.count()

227

## Consulta para ver algunos registros de la dimensión Proveedor

In [487]:
sql_proveedoresS4 = '''(SELECT * FROM Estudiante_81_202413.ProveedorS4) AS Temp_proveedores'''
proveedoresS4 = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedoresS4, db_user, db_psswd)
proveedoresS4.show(10)

+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|               1|             3|Consolidated Mess...|servicios de mens...|       Helen Ahven|       30|        94101|
|               2|            10|Northwind Electri...|            juguetes|     Sara Karlsson|       30|         7860|
|               3|             7|       Litware, Inc.|            embalaje|        Vilma Niva|       30|        95245|
|               4|             6| Humongous Insurance|servicios de seguros|     Annette Talon|       14|        37770|
|               5|             4|      Fabrikam, Inc.|                ropa|       Helen Moore|       30|        40351|
|               6|            11|       Trey Res

In [488]:
proveedoresS4.count()

14

## Consulta para ver algunos registros de la dimensión Cliente
Vale la pena resaltar que está tabla se crea a partir de la interacción con el tutorial, ya que el enunciado no hace explícito que se deba hacer el proceso de ETL para esta dimensión.

In [489]:
sql_clientesS4 = '''(SELECT * FROM Estudiante_81_202413.ClienteS4) AS Temp_clientes'''
clientesS4 = obtener_dataframe_de_bd(source_db_connection_string, sql_clientesS4, db_user, db_psswd)
clientesS4.show(10)

+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|            Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |           Missing|        Missing|          Missing|             0|               0|             |                   |        |
|             0|            |           Missing|        Missing|          Missing|             0|               0|             |                   |        |
|             1|         807|     Gunnar Lohmus|     Gift Store|          Missing|           807|           35810|         3100|2013-01-01 00:00:00|       7|
|             2|         816|        Harsha Huq|    

In [490]:
clientesS4.count()

665

## Consulta para ver algunos registros de la dimensión TipoTransaccion

In [491]:
sql_tiposTransaccionS4 = '''(SELECT * FROM Estudiante_81_202413.TipoTransaccionS4) AS Temp_tiposTransaccion'''
tiposTransaccionS4 = obtener_dataframe_de_bd(source_db_connection_string, sql_tiposTransaccionS4, db_user, db_psswd)
tiposTransaccionS4.show(10)

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
|                      6|                    7|Supplier Payment ...|
|                      7|                    8|     Supplier Refund|
|                      8|                    9|      Stock Transfer|
|                      9|                   10|         Stock Issue|
|                     10|                   11|       Stock Receipt|
+-----------------------+---------------------+--------------------+
only showing top 10 rows



In [492]:
tiposTransaccionS4.count()

13

## Consulta para ver algunos registros de la dimensión Hecho_Movimiento

In [507]:
sql_hechoMovimientosS4 = '''(SELECT * FROM Estudiante_81_202413.Hecho_MovimientoS4) AS Temp_hechoMovimiento'''
hechoMovimientosS4 = obtener_dataframe_de_bd(source_db_connection_string, sql_hechoMovimientosS4, db_user, db_psswd)
hechoMovimientosS4.show(10)

+--------+---------------+----------------+--------------+-----------------------+--------+
|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_transaccion_DWH|Cantidad|
+--------+---------------+----------------+--------------+-----------------------+--------+
|20151231|             77|               9|             0|                     10|    50.0|
|20150519|            159|               0|           520|                      9|   -10.0|
|20140723|            184|               9|             0|                     10|    84.0|
|20150519|            203|               0|            60|                      9|   -10.0|
|20150824|             14|               9|             0|                     10|     3.0|
|20150519|            167|               0|           433|                      9|   -10.0|
|20140918|            119|               9|             0|                     10|     1.0|
|20160301|             16|               0|           183|                      

In [508]:
hechoMovimientosS4.count()

236667