# Tarea: Construir un proceso ETL

## Introducción
    Objetivo
       Realizar un proceso ETL básico en Pyspark
	¿Para qué?
      Practicar lo aprendido en el tutorial de ETL

## Configuración e importe de paquetes

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType, TimestampType
from pyspark.sql.functions import udf, col, countDistinct, length, isnan, when, count, max as spark_max, to_date, year, to_timestamp, expr, substring, min, corr
import pyspark.sql.functions as f
import os 
from datetime import datetime
from pyspark.sql import types as t
from pandas_profiling import ProfileReport
import matplotlib.pyplot as plt
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Configuración servidor base de datos transaccional
db_user = 'Estudiante_30_202413'
db_psswd = 'MISO_aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'
dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_30_202413'
# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)
spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Definicion de las funciónes para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

## Dimensiones

### Proveedor

La dimensión Proveedor contiene información detallada sobre los proveedores que suministran productos a la empresa. Esta dimensión es fundamental para el análisis de la relación con los proveedores, incluyendo aspectos como la categoría del proveedor, los datos de contacto y los términos de pago. La implementación del ETL para esta dimensión se centra en asegurar la calidad y consistencia de los datos, eliminando duplicados y corrigiendo errores comunes.

#### Extracción
En esta fase, se extraen los datos de la tabla Proveedores de la base de datos transaccional WWImportersTransactional. La extracción incluye los campos necesarios para el análisis y omite aquellos que no son relevantes para la dimensión de proveedor.

In [5]:
# Definir la consulta SQL para extraer solo los datos necesarios de la tabla Proveedores
sql_query_proveedores = """
(SELECT 
    ProveedorID, 
    NombreProveedor, 
    CategoriaProveedorID, 
    PersonaContactoPrincipalID, 
    PersonaContactoAlternoID, 
    DiasPago, 
    CodigoPostal, 
    UltimaEdicionPor
FROM proveedores) AS Proveedores"""

# Extraer los datos de la tabla Proveedores utilizando la función definida previamente
df_proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_query_proveedores, db_user, db_psswd)

# Mostrar los primeros registros para verificar la extracción
df_proveedores.show()

+-----------+--------------------+--------------------+--------------------------+------------------------+--------+------------+----------------+
|ProveedorID|     NombreProveedor|CategoriaProveedorID|PersonaContactoPrincipalID|PersonaContactoAlternoID|DiasPago|CodigoPostal|UltimaEdicionPor|
+-----------+--------------------+--------------------+--------------------------+------------------------+--------+------------+----------------+
|          4|      Fabrikam, Inc.|                   4|                        27|                      28|      30|       40351|               1|
|          5|Graphic Design In...|                   2|                        29|                      30|      14|       64847|               1|
|          7|       Litware, Inc.|                   5|                        33|                      34|      30|       95245|               1|
|          9|      Nod Publishers|                   2|                        37|                      38|       7|  

#### Transformación
En esta fase, los datos extraídos de la tabla Proveedores se transformarán para corregir errores y asegurar la calidad de los datos. Las transformaciones incluyen la eliminación de registros duplicados, la corrección de valores negativos en el campo DiasPago, la estandarización del formato de fechas y la combinación de registros de proveedores duplicados.

In [6]:
from pyspark.sql.window import Window

# Eliminar registros duplicados basados en ProveedorID
df_proveedores = df_proveedores.dropDuplicates(["ProveedorID"])

# Corregir valores negativos en el campo DiasPago
df_proveedores = df_proveedores.withColumn("DiasPago", 
                                           f.when(df_proveedores["DiasPago"] < 0, 
                                                  df_proveedores["DiasPago"] * -1).otherwise(df_proveedores["DiasPago"]))

# Combinar registros de proveedores con nombres duplicados (con y sin "Inc" o "Ltd")
# Primero, crear una columna estandarizada para nombres de proveedores
df_proveedores = df_proveedores.withColumn("NombreProveedorEstandarizado", 
                                           f.trim(f.regexp_replace(f.col("NombreProveedor"), "Inc|Ltd", "")))

# Agrupar por el nombre estandarizado y seleccionar los valores no nulos más recientes
window_spec = Window.partitionBy("NombreProveedorEstandarizado").orderBy(f.desc("UltimaEdicionPor"))
df_proveedores = df_proveedores.withColumn("row_number", f.row_number().over(window_spec)).filter(f.col("row_number") == 1).drop("row_number")

# Unir con la tabla CategoriasProveedores para obtener el nombre de la categoría
sql_query_categorias = """
(SELECT 
    CategoriaProveedorID, 
    CategoriaProveedor 
FROM CategoriasProveedores) AS CategoriasProveedores"""
df_categorias = obtener_dataframe_de_bd(source_db_connection_string, sql_query_categorias, db_user, db_psswd)

df_proveedores = df_proveedores.join(df_categorias, "CategoriaProveedorID", "left")

# Seleccionar y renombrar columnas para la tabla destino
df_proveedores = df_proveedores.select(
    f.col("ProveedorID").alias("ID_Proveedor_T"),
    f.col("NombreProveedor"),
    f.col("CategoriaProveedor").alias("Categoria"),
    f.col("PersonaContactoPrincipalID"),
    f.col("PersonaContactoAlternoID"),
    f.col("DiasPago"),
    f.col("CodigoPostal")
)

# Mostrar los primeros registros transformados
df_proveedores.show()

+--------------+--------------------+--------------------+--------------------------+------------------------+--------+------------+
|ID_Proveedor_T|     NombreProveedor|           Categoria|PersonaContactoPrincipalID|PersonaContactoAlternoID|DiasPago|CodigoPostal|
+--------------+--------------------+--------------------+--------------------------+------------------------+--------+------------+
|             1| A Datum Corporation| productos novedosos|                        21|                      22|      14|       46077|
|             2|       Contoso, Ltd.| productos novedosos|                        23|                      24|       7|       98253|
|             5|Graphic Design In...| productos novedosos|                        29|                      30|      14|       64847|
|             8|  Lucerne Publishing| productos novedosos|                        35|                      36|      30|       37659|
|             9|      Nod Publishers| productos novedosos|           

#### Carga 
En esta fase, los datos transformados de la tabla Proveedores se cargarán en la tabla de dimensión ETL_Proveedor en la base de datos de destino. Se asegurará que los datos se inserten correctamente y se verificará que la estructura de la tabla de destino coincida con los datos transformados.

In [7]:
tabla_destino = "ETL_Proveedor"
guardar_db(dest_db_connection_string, df_proveedores, tabla_destino, db_user, db_psswd)
print("Datos cargados exitosamente en la tabla ETL_Proveedor")

Datos cargados exitosamente en la tabla ETL_Proveedor


### TipoTransaccion

La dimensión TipoTransaccion contiene información sobre los diferentes tipos de transacciones que pueden ocurrir en el sistema. Esta dimensión es fundamental para categorizar y analizar las transacciones en el contexto de las operaciones comerciales. La implementación del ETL para esta dimensión se centra en asegurar la calidad de los datos, eliminando duplicados y estandarizando los nombres de las transacciones.

#### Extracción

En esta fase, se extraen los datos de la tabla TiposTransaccion de la base de datos transaccional WWImportersTransactional. La extracción incluye solo los campos necesarios para el análisis y omite aquellos que no son relevantes para la dimensión de tipo de transacción.

In [8]:
# Definir la consulta SQL para extraer solo los datos necesarios
sql_query_tipos_transaccion = """
(SELECT 
    TipoTransaccionID, 
    TipoTransaccionNombre 
FROM TiposTransaccion) AS TiposTransaccion"""
# Extraer los datos de la tabla y muestra
df_tipos_transaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_query_tipos_transaccion, db_user, db_psswd)
df_tipos_transaccion.show()

+-----------------+---------------------+
|TipoTransaccionID|TipoTransaccionNombre|
+-----------------+---------------------+
|                2| Customer Credit Note|
|                3| Customer Payment ...|
|                4|      Customer Refund|
|                5|     Supplier Invoice|
|                6| Supplier Credit Note|
|                7| Supplier Payment ...|
|                8|      Supplier Refund|
|                9|       Stock Transfer|
|               10|          Stock Issue|
|               11|        Stock Receipt|
|               12| Stock Adjustment ...|
|               13|      Customer Contra|
+-----------------+---------------------+



#### Transformación

En esta fase, los datos extraídos de la tabla TiposTransaccion se transformarán para corregir errores y asegurar la calidad de los datos. Las transformaciones incluyen la eliminación de registros duplicados y la estandarización de los nombres de transacción.

In [9]:
# Eliminar registros duplicados basados en TipoTransaccionID
df_tipos_transaccion = df_tipos_transaccion.dropDuplicates(["TipoTransaccionID"])
# Estandarizar los nombres de transacción
df_tipos_transaccion = df_tipos_transaccion.withColumn("TipoTransaccionNombre", f.upper(f.col("TipoTransaccionNombre")))
# Seleccionar y renombrar columnas para la tabla destino
df_tipos_transaccion = df_tipos_transaccion.select(
    f.col("TipoTransaccionID").alias("ID_Tipo_Transaccion_T"),
    f.col("TipoTransaccionNombre").alias("Tipo")
)
df_tipos_transaccion.show()

+---------------------+--------------------+
|ID_Tipo_Transaccion_T|                Tipo|
+---------------------+--------------------+
|                    2|CUSTOMER CREDIT NOTE|
|                    3|CUSTOMER PAYMENT ...|
|                    4|     CUSTOMER REFUND|
|                    5|    SUPPLIER INVOICE|
|                    6|SUPPLIER CREDIT NOTE|
|                    7|SUPPLIER PAYMENT ...|
|                    8|     SUPPLIER REFUND|
|                    9|      STOCK TRANSFER|
|                   10|         STOCK ISSUE|
|                   11|       STOCK RECEIPT|
|                   12|STOCK ADJUSTMENT ...|
|                   13|     CUSTOMER CONTRA|
+---------------------+--------------------+



#### Carga

In [10]:
tabla_destino = "ETL_TipoTransaccion"
guardar_db(dest_db_connection_string, df_tipos_transaccion, tabla_destino, db_user, db_psswd)
print("Datos cargados exitosamente en la tabla ETL_TipoTransaccion")

Datos cargados exitosamente en la tabla ETL_TipoTransaccion


### Fecha

La dimensión Fecha contiene información detallada sobre las fechas relevantes para las transacciones. Esta dimensión es fundamental para el análisis temporal de los datos, permitiendo segmentar y analizar las transacciones por día, mes, año y otros componentes temporales. La implementación del ETL para esta dimensión se centra en asegurar la integridad y consistencia de las fechas, eliminando duplicados y estandarizando los formatos.

#### Extracción

En esta fase, se extraen las fechas de las transacciones de la tabla movimientos de la base de datos transaccional WWImportersTransactional. La extracción incluye solo el campo necesario FechaTransaccion.

In [11]:
sql_query_fecha = """
(SELECT 
    FechaTransaccion 
FROM movimientos_v2) AS movimientos"""
df_fecha = obtener_dataframe_de_bd(source_db_connection_string, sql_query_fecha, db_user, db_psswd)
df_fecha.show()

+----------------+
|FechaTransaccion|
+----------------+
|     Jan 20,2014|
|     Jan 28,2014|
|     Jan 28,2014|
|     Jan 28,2014|
|     Jan 28,2014|
|     Feb 01,2014|
|     Feb 01,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
|     Mar 25,2014|
+----------------+
only showing top 20 rows



#### Transformación

En esta fase, los datos extraídos de la tabla movimientos se transformarán para corregir errores y asegurar la calidad de los datos. Las transformaciones incluyen la eliminación de registros duplicados, la estandarización del formato de fechas y la creación de columnas adicionales para día, mes, año y número de semana ISO.

In [12]:
from pyspark.sql.functions import to_date, col, when, udf, regexp_replace
from pyspark.sql.types import StringType, DateType
from datetime import datetime

# Política de tiempo LEGACY
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Eliminar registros duplicados basados en FechaTransaccion
df_fecha = df_fecha.dropDuplicates(["FechaTransaccion"])

# Definir una función UDF para convertir el formato 'MMM DD, YYYY' a 'YYYY-MM-DD'
def convert_to_date(date_str):
    try:
        return datetime.strptime(date_str, '%b %d,%Y').strftime('%Y-%m-%d')
    except ValueError:
        return date_str

convert_to_date_udf = udf(convert_to_date, StringType())

# Convertir el formato 'MMM DD, YYYY' a 'YYYY-MM-DD'
df_fecha = df_fecha.withColumn("FechaTransaccion", convert_to_date_udf(col("FechaTransaccion")))

# Convertir fechas que cumplen con el formato 'YYYY-MM-DD HH:MM:SS.0000000' a 'YYYY-MM-DD'
df_fecha = df_fecha.withColumn("FechaTransaccion", regexp_replace(col("FechaTransaccion"), " 00:00:00.0000000", ""))
df_fecha = df_fecha.withColumn("FechaTransaccion", regexp_replace(col("FechaTransaccion"), " 07:00:00.0000000", ""))

# Convertir la columna FechaTransaccion a formato de fecha
df_fecha = df_fecha.withColumn("Fecha", to_date(col("FechaTransaccion"), "yyyy-MM-dd"))

# Verificar las fechas parseadas
df_fecha.select("FechaTransaccion", "Fecha").show(truncate=False)

# Crear columnas adicionales para día, mes, año y número de semana ISO
df_fecha = df_fecha.withColumn("Dia", f.dayofmonth(f.col("Fecha")))
df_fecha = df_fecha.withColumn("Mes", f.month(f.col("Fecha")))
df_fecha = df_fecha.withColumn("Ano", f.year(f.col("Fecha")))
df_fecha = df_fecha.withColumn("Numero_semana_ISO", f.weekofyear(f.col("Fecha")))

# Crear una columna idFecha en el formato YYYYMMDD
df_fecha = df_fecha.withColumn("idFecha", f.date_format(f.col("Fecha"), "yyyyMMdd").cast("int"))

# Seleccionar y renombrar columnas para la tabla destino
df_fecha = df_fecha.select(
    f.col("idFecha"),
    f.col("Fecha"),
    f.col("Dia"),
    f.col("Mes"),
    f.col("Ano"),
    f.col("Numero_semana_ISO")
)

# Mostrar los primeros registros transformados
df_fecha.show(truncate=False)

+---------------------------+----------+
|FechaTransaccion           |Fecha     |
+---------------------------+----------+
|2014-02-27                 |2014-02-27|
|2014-11-28 12:00:00.0000000|2014-11-28|
|2014-06-26 12:00:00.0000000|2014-06-26|
|2014-11-19                 |2014-11-19|
|2015-02-18                 |2015-02-18|
|2015-10-27                 |2015-10-27|
|2016-01-27                 |2016-01-27|
|2014-08-28                 |2014-08-28|
|2014-09-19                 |2014-09-19|
|2015-02-12                 |2015-02-12|
|2015-01-03                 |2015-01-03|
|2015-09-21                 |2015-09-21|
|2015-08-22                 |2015-08-22|
|2016-05-14                 |2016-05-14|
|2014-07-26                 |2014-07-26|
|2014-04-18 12:00:00.0000000|2014-04-18|
|2013-08-16 12:00:00.0000000|2013-08-16|
|2013-04-26                 |2013-04-26|
|2013-11-21                 |2013-11-21|
|2015-01-16                 |2015-01-16|
+---------------------------+----------+
only showing top

#### Carga

En esta fase, los datos transformados de la tabla Fecha se cargarán en la tabla de dimensión ETL_Fecha en la base de datos de destino. Se asegurará que los datos se inserten correctamente y se verificará que la estructura de la tabla de destino coincida con los datos transformados.

In [13]:
tabla_destino = "ETL_Fecha"
guardar_db(dest_db_connection_string, df_fecha, tabla_destino, db_user, db_psswd)
print("Datos cargados exitosamente en la tabla ETL_Fecha")

Datos cargados exitosamente en la tabla ETL_Fecha


### Hecho_Movimiento

La tabla de hechos Hecho_Movimiento contiene información detallada sobre las transacciones o movimientos de productos en el inventario. Esta tabla de hechos es fundamental para el análisis de las transacciones, permitiendo el análisis en combinación con las diferentes dimensiones como Proveedor, TipoTransaccion, Fecha, Producto y Cliente.

#### Extracción

En esta fase, se extraen los datos de la tabla movimientos de la base de datos transaccional WWImportersTransactional. La extracción incluye todos los campos necesarios para el análisis.

In [14]:
sql_query_movimientos = """
(SELECT 
    FechaTransaccion, 
    ProductoID, 
    ProveedorID, 
    ClienteID, 
    TipoTransaccionID, 
    Cantidad 
FROM movimientos_v2) AS movimientos"""

# Extraer los datos de la tabla movimientos
df_movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_query_movimientos, db_user, db_psswd)
# Mostrar los primeros registros para verificar 
df_movimientos.show()

+----------------+----------+-----------+---------+-----------------+--------+
|FechaTransaccion|ProductoID|ProveedorID|ClienteID|TipoTransaccionID|Cantidad|
+----------------+----------+-----------+---------+-----------------+--------+
|     Jan 20,2014|       108|       null|    185.0|               10|   -10.0|
|     Jan 28,2014|       162|        4.0|      0.0|               11|    10.0|
|     Jan 28,2014|       216|       null|    474.0|               10|   -10.0|
|     Jan 28,2014|        22|        7.0|      0.0|               11|    10.0|
|     Jan 28,2014|        25|        7.0|      0.0|               11|    10.0|
|     Feb 01,2014|        14|       null|    444.0|               10|   -10.0|
|     Feb 01,2014|        75|        7.0|      0.0|               11|    10.0|
|     Mar 25,2014|        20|       null|    802.0|               10|   -10.0|
|     Mar 25,2014|        65|        4.0|      0.0|               11|    10.0|
|     Mar 25,2014|       130|       null|    487.0| 

#### Transformación

En esta fase, los datos extraídos de la tabla movimientos se transformarán para corregir errores y asegurar la calidad de los datos. Las transformaciones incluyen la eliminación de registros duplicados, la corrección de valores negativos en el campo Cantidad, la estandarización del formato de fechas y la combinación de registros con las dimensiones relevantes.

In [15]:
from pyspark.sql.functions import to_date, col, when, udf, regexp_replace
from pyspark.sql.types import StringType
from datetime import datetime

# Definir una función UDF para convertir el formato 'MMM DD, YYYY' a 'YYYY-MM-DD'
def convert_to_date(date_str):
    try:
        return datetime.strptime(date_str, '%b %d,%Y').strftime('%Y-%m-%d')
    except ValueError:
        return date_str

convert_to_date_udf = udf(convert_to_date, StringType())

# Convertir el formato 'MMM DD, YYYY' a 'YYYY-MM-DD'
df_movimientos = df_movimientos.withColumn("FechaTransaccion", convert_to_date_udf(col("FechaTransaccion")))

# Eliminar partes innecesarias del tiempo en las fechas
df_movimientos = df_movimientos.withColumn("FechaTransaccion", regexp_replace(col("FechaTransaccion"), " 00:00:00.0000000", ""))
df_movimientos = df_movimientos.withColumn("FechaTransaccion", regexp_replace(col("FechaTransaccion"), " 07:00:00.0000000", ""))

# Convertir la columna FechaTransaccion a formato de fecha
df_movimientos = df_movimientos.withColumn("Fecha", to_date(f.col("FechaTransaccion"), "yyyy-MM-dd"))

# Verificar las fechas parseadas correctamente
df_movimientos.select("FechaTransaccion", "Fecha").distinct().show(truncate=False)
df_fecha.select("Fecha").distinct().show(truncate=False)

# Asignar 0 a ProveedorID si es null
df_movimientos = df_movimientos.withColumn("ProveedorID", f.when(f.col("ProveedorID").isNull(), 0).otherwise(f.col("ProveedorID")))

# Unir con la dimensión Fecha para obtener el idFecha
df_movimientos = df_movimientos.join(df_fecha.select("Fecha", "idFecha"), on="Fecha", how="left")

# Convertir columnas a tipos de datos enteros para eliminar decimales
df_movimientos = df_movimientos.withColumn("ID_Proveedor_DWH", f.col("ProveedorID").cast("int"))
df_movimientos = df_movimientos.withColumn("ID_Cliente_DWH", f.col("ClienteID").cast("int"))
df_movimientos = df_movimientos.withColumn("ID_Tipo_Transaccion_DWH", f.col("TipoTransaccionID").cast("int"))
df_movimientos = df_movimientos.withColumn("Cantidad", f.col("Cantidad").cast("int"))

# Seleccionar y renombrar columnas para la tabla destino
df_movimientos = df_movimientos.select(
    f.col("idFecha"),
    f.col("ProductoID").alias("ID_Producto_DWH"),
    f.col("ID_Proveedor_DWH"),
    f.col("ID_Cliente_DWH"),
    f.col("ID_Tipo_Transaccion_DWH"),
    f.col("Cantidad")
)

# Mostrar los primeros registros transformados
df_movimientos.show(truncate=False)


+---------------------------+----------+
|FechaTransaccion           |Fecha     |
+---------------------------+----------+
|2016-03-11                 |2016-03-11|
|2015-07-27                 |2015-07-27|
|2015-09-29                 |2015-09-29|
|2016-04-09                 |2016-04-09|
|2015-11-06                 |2015-11-06|
|2015-09-12                 |2015-09-12|
|2016-03-18                 |2016-03-18|
|2013-04-13 12:00:00.0000000|2013-04-13|
|2015-01-05 12:00:00.0000000|2015-01-05|
|2014-12-02 12:00:00.0000000|2014-12-02|
|2014-04-09 12:00:00.0000000|2014-04-09|
|2013-03-18 12:00:00.0000000|2013-03-18|
|2015-01-15 12:00:00.0000000|2015-01-15|
|2014-07-04 12:00:00.0000000|2014-07-04|
|2013-01-16                 |2013-01-16|
|2014-05-21                 |2014-05-21|
|2015-01-24                 |2015-01-24|
|2014-04-30                 |2014-04-30|
|2015-09-21                 |2015-09-21|
|2014-04-11                 |2014-04-11|
+---------------------------+----------+
only showing top

#### Carga

En esta fase, los datos transformados de la tabla movimientos se cargarán en la tabla de hechos ETL_Hecho_Movimiento en la base de datos de destino. Se asegurará que los datos se inserten correctamente y se verificará que la estructura de la tabla de destino coincida con los datos transformados.

In [16]:
tabla_destino = "ETL_Hecho_Movimiento"
guardar_db(dest_db_connection_string, df_movimientos, tabla_destino, db_user, db_psswd)
print("Datos cargados exitosamente en la tabla ETL_Hecho_Movimiento")

Datos cargados exitosamente en la tabla ETL_Hecho_Movimiento


## Reglas del negocio 

### Los días de pago no pueden ser negativos

In [17]:
df_proveedor = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Proveedor', db_user, db_psswd)
# Verificar registros con DiasPago negativo
df_proveedor_negativos = df_proveedor.filter(df_proveedor["DiasPago"] < 0)
df_proveedor_negativos.show()

+--------------+---------------+---------+--------------------------+------------------------+--------+------------+
|ID_Proveedor_T|NombreProveedor|Categoria|PersonaContactoPrincipalID|PersonaContactoAlternoID|DiasPago|CodigoPostal|
+--------------+---------------+---------+--------------------------+------------------------+--------+------------+
+--------------+---------------+---------+--------------------------+------------------------+--------+------------+



### Verificar datos antes de 2014

In [23]:
df_hecho_movimiento = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Hecho_Movimiento', db_user, db_psswd)

# Convertir la columna idFecha a String para poder filtrar por año
df_hecho_movimiento = df_hecho_movimiento.withColumn("idFecha_str", f.col("idFecha").cast("string"))

# Filtrar registros con fechas anteriores a 2014
df_antes_2014 = df_hecho_movimiento.filter(f.col("idFecha_str").substr(1, 4) < '2014')

# Mostrar los registros anteriores a 2014
df_antes_2014.show(truncate=False)

# Contar el número de registros anteriores a 2014
count_antes_2014 = df_antes_2014.count()

print(f"Número de registros con fechas anteriores a 2014: {count_antes_2014}")


+--------+---------------+----------------+--------------+-----------------------+--------+-----------+
|idFecha |ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_Transaccion_DWH|Cantidad|idFecha_str|
+--------+---------------+----------------+--------------+-----------------------+--------+-----------+
|20131116|45             |0               |6             |10                     |-10     |20131116   |
|20131116|210            |0               |933           |10                     |-10     |20131116   |
|20131116|4              |0               |448           |10                     |-10     |20131116   |
|20131116|24             |0               |71            |10                     |-10     |20131116   |
|20131116|44             |0               |446           |10                     |-10     |20131116   |
|20131116|16             |0               |566           |10                     |-10     |20131116   |
|20131116|62             |4               |0             |11    

### Validar duplicados 

In [24]:
# Verificar duplicados en ETL_Hecho_Movimiento
df_hecho_movimiento = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Hecho_Movimiento', db_user, db_psswd)
df_hecho_movimiento_duplicados = df_hecho_movimiento.groupBy("idFecha", "ID_Producto_DWH", "ID_Proveedor_DWH", "ID_Cliente_DWH", "ID_Tipo_Transaccion_DWH", "Cantidad").count().filter("count > 1")
df_hecho_movimiento_duplicados.show()

+--------+---------------+----------------+--------------+-----------------------+--------+-----+
| idFecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_Transaccion_DWH|Cantidad|count|
+--------+---------------+----------------+--------------+-----------------------+--------+-----+
|20150519|            113|               0|           879|                     10|     -10|    3|
|20151027|             73|               4|             0|                     11|       5|    3|
|20151027|             59|               4|             0|                     11|       2|    3|
|20150106|             13|               4|             0|                     11|       1|    3|
|20130116|            143|               4|             0|                     11|      12|    2|
|20150309|            104|               0|           426|                     10|      -8|    3|
|20130521|             26|               0|           972|                     10|      -5|    2|
|20130521|          

### Validar formato de fechas

In [25]:
# Verificar el formato de las fechas en ETL_Fecha
df_fecha = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Fecha', db_user, db_psswd)
df_fecha_formato = df_fecha.filter(~(f.col("Fecha").like("____-__-__") | ((f.length("Fecha") > 10) & f.col("Fecha").like("____-__-__ __:__:__"))))
df_fecha_formato.show()

+-------+-----+---+---+---+-----------------+
|idFecha|Fecha|Dia|Mes|Ano|Numero_semana_ISO|
+-------+-----+---+---+---+-----------------+
+-------+-----+---+---+---+-----------------+



### Verificar nombres de proveedores unificados

Confirmar que los proveedores con nombres duplicados (con o sin "Inc" o "Ltd") se hayan unificado.

In [26]:
df_proveedor = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Proveedor', db_user, db_psswd)

# Mostrar los primeros registros para verificar
df_proveedor.show(5)

# Normalizar los nombres de los proveedores eliminando "Inc" y "Ltd" para la comprobación
df_proveedor_norm = df_proveedor.withColumn("NombreProveedorEstandarizado", 
                                            f.trim(f.regexp_replace(f.col("NombreProveedor"), "(?i)\\s*Inc\\.?|(?i)\\s*Ltd\\.?", "")))

# Agrupar y contar los proveedores normalizados para verificar duplicados
df_proveedor_duplicados = df_proveedor_norm.groupBy("NombreProveedorEstandarizado").count().filter("count > 1")

# Mostrar los proveedores duplicados
df_proveedor_duplicados.show(truncate=False)


+--------------+--------------------+-------------------+--------------------------+------------------------+--------+------------+
|ID_Proveedor_T|     NombreProveedor|          Categoria|PersonaContactoPrincipalID|PersonaContactoAlternoID|DiasPago|CodigoPostal|
+--------------+--------------------+-------------------+--------------------------+------------------------+--------+------------+
|             1| A Datum Corporation|productos novedosos|                        21|                      22|      14|       46077|
|             2|       Contoso, Ltd.|productos novedosos|                        23|                      24|       7|       98253|
|             5|Graphic Design In...|productos novedosos|                        29|                      30|      14|       64847|
|             8|  Lucerne Publishing|productos novedosos|                        35|                      36|      30|       37659|
|             9|      Nod Publishers|productos novedosos|                   

### Verificar el código postal de los Proveedores

In [27]:
df_proveedor = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Proveedor', db_user, db_psswd)

# Mostrar los primeros registros para verificar
df_proveedor.show(5)

# Agrupar por CodigoPostal y contar las ocurrencias
df_codigo_postal_repetido = df_proveedor.groupBy("CodigoPostal").count().filter("count > 1")

# Mostrar el código postal que se repite y el número de ocurrencias
df_codigo_postal_repetido.show()

# Obtener el código postal que se repite
codigo_postal_repetido = df_codigo_postal_repetido.select("CodigoPostal").collect()

# Verificar si hay un código postal repetido
if len(codigo_postal_repetido) > 0:
    codigo_postal_repetido = codigo_postal_repetido[0]["CodigoPostal"]
    
    # Filtrar proveedores con el código postal repetido
    df_proveedores_duplicados = df_proveedor.filter(f.col("CodigoPostal") == codigo_postal_repetido)
    
    # Mostrar los proveedores asociados al código postal repetido
    df_proveedores_duplicados.show(truncate=False)
else:
    print("No hay códigos postales repetidos.")


+--------------+--------------------+-------------------+--------------------------+------------------------+--------+------------+
|ID_Proveedor_T|     NombreProveedor|          Categoria|PersonaContactoPrincipalID|PersonaContactoAlternoID|DiasPago|CodigoPostal|
+--------------+--------------------+-------------------+--------------------------+------------------------+--------+------------+
|             1| A Datum Corporation|productos novedosos|                        21|                      22|      14|       46077|
|             2|       Contoso, Ltd.|productos novedosos|                        23|                      24|       7|       98253|
|             5|Graphic Design In...|productos novedosos|                        29|                      30|      14|       64847|
|             8|  Lucerne Publishing|productos novedosos|                        35|                      36|      30|       37659|
|             9|      Nod Publishers|productos novedosos|                   

Validamos que tanto Consolidated Messenger y Woodgrove Bank tienen el mismo código postal 

In [18]:
# Verificar duplicados en la tabla de destino ETL_Hecho_Movimiento
df_hecho_movimiento = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Hecho_Movimiento', db_user, db_psswd)

# Agrupar y contar registros duplicados
df_hecho_movimiento_duplicados = df_hecho_movimiento.groupBy("idFecha", "ID_Producto_DWH", "ID_Proveedor_DWH", "ID_Cliente_DWH", "ID_Tipo_Transaccion_DWH", "Cantidad").count().filter("count > 1")

# Mostrar registros duplicados
df_hecho_movimiento_duplicados.show()

+--------+---------------+----------------+--------------+-----------------------+--------+-----+
| idFecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_Transaccion_DWH|Cantidad|count|
+--------+---------------+----------------+--------------+-----------------------+--------+-----+
|20150519|            113|               0|           879|                     10|     -10|    3|
|20151027|             73|               4|             0|                     11|       5|    3|
|20151027|             59|               4|             0|                     11|       2|    3|
|20150106|             13|               4|             0|                     11|       1|    3|
|20130116|            143|               4|             0|                     11|      12|    2|
|20150309|            104|               0|           426|                     10|      -8|    3|
|20130521|             26|               0|           972|                     10|      -5|    2|
|20130521|          

In [19]:
# Verificar si hay registros adicionales generados por la unión con dimensiones
df_fecha = obtener_dataframe_de_bd(dest_db_connection_string, 'ETL_Fecha', db_user, db_psswd)
df_movimientos = obtener_dataframe_de_bd(source_db_connection_string, 'movimientos', db_user, db_psswd)

# Realizar la unión original y contar registros
df_movimientos = df_movimientos.withColumn("Fecha", to_date(f.col("FechaTransaccion"), "yyyy-MM-dd"))
df_movimientos = df_movimientos.join(df_fecha.select("Fecha", "idFecha"), "Fecha", "left")

# Contar registros después de la unión
num_registros_union = df_movimientos.count()

print(f"Número de registros después de la unión: {num_registros_union}")


Número de registros después de la unión: 595683
