# Tutorial: creación de ETLs incrementales con PySpark

In [1]:
# Configuración servidor base de datos transaccional
# Utilizar el usuario asignado en el curso
db_user_2 = ''
db_user = ''
db_psswd = ''

In [2]:
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_67_202315'
grupo_connection_string = 'jdbc:mysql://157.253.236.116:8080/Proyecto_G9_202315' 

sebas_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/RaSaTransaccional_ETL'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [3]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark.sql.functions import lit
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
import mysql.connector
from pyspark.sql.window import Window
from datetime import datetime 
import mysql.connector
from mysql.connector import Error

In [4]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [5]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

### Áreas de servicio

In [10]:
sql_Area = """ (
SELECT 
    ROW_NUMBER() OVER () AS IdAreaDeServicio_DWH,
	IdAreaDeServicio_T, 
    NombreAreaDeServicio AS Nombre, 
    Fecha AS AnnioCreacion 
FROM 
RaSaTransaccional_ETL.FuenteAreasDeServicio_ETL 
GROUP BY 
	IdAreaDeServicio_T, 
    NombreAreaDeServicio, 
    Fecha
) AS Temp_AreaServ"""
AreaServicio = obtener_dataframe_de_bd (grupo_connection_string, sql_Area, db_user, db_psswd)
AreaServicio.show(10)

+--------------------+------------------+--------------------+-------------+
|IdAreaDeServicio_DWH|IdAreaDeServicio_T|              Nombre|AnnioCreacion|
+--------------------+------------------+--------------------+-------------+
|                   1|         100622017|New Jersey - Medi...|         2017|
|                   2|         100722019|New Jersey  - Med...|         2019|
|                   3|         100922020|New Jersey - Medi...|         2020|
|                   4|         101012018|New Jersey - Medi...|         2018|
|                   5|         101062020|New Jersey - Medi...|         2020|
|                   6|         101082018|New Jersey - Medi...|         2018|
|                   7|         101152020|New Jersey - Medi...|         2020|
|                   8|           1012019|Dental11469KY0010...|         2019|
|                   9|         101242019|New Jersey  - Med...|         2019|
|                  10|          10132017|BlueOptions16842F...|         2017|

In [11]:
guardar_db(grupo_connection_string, AreaServicio,'Proyecto_G9_202315.AreasDeServicio', db_user, db_psswd)

### Geografía 

In [17]:
#ETL geografia 
#Desde SQL se hace el proceso de extracción y transformacion de la siguiente manera:

sql_geografia = """ (
SELECT DISTINCT
    ROW_NUMBER() OVER () AS IdGeografia_DWH,
    geo.IdGeografia_T,
    geo.Estado,
    geo.Condado,
    CASE 
        WHEN geo.Area < 0 THEN geo.Area * -1 
        ELSE geo.Area 
    END AS AreaAct,
    geo.Densidad AS DensidadAct,
    CASE 
        WHEN RIGHT(geo.PoblacionAct, 4) = '0001' THEN LEFT(geo.PoblacionAct, LENGTH(geo.PoblacionAct) - 4)
        ELSE geo.PoblacionAct 
    END AS PoblacionAct
FROM RaSaTransaccional_ETL.FuenteAreasDeServicio_ETL AS geo 
GROUP BY 
    geo.IdGeografia_T,
    geo.Estado,
    geo.Condado,
    CASE 
        WHEN geo.Area < 0 THEN geo.Area * -1 
        ELSE geo.Area 
    END,
    geo.Densidad,
    CASE 
        WHEN RIGHT(geo.PoblacionAct, 4) = '0001' THEN LEFT(geo.PoblacionAct, LENGTH(geo.PoblacionAct) - 4)
        ELSE geo.PoblacionAct 
    END
) AS Temp_geografia"""
Geografia = obtener_dataframe_de_bd (grupo_connection_string, sql_geografia, db_user, db_psswd)
Geografia.show(10)

+---------------+-------------+----------+-----------------+-------+-----------+------------+
|IdGeografia_DWH|IdGeografia_T|    Estado|          Condado|AreaAct|DensidadAct|PoblacionAct|
+---------------+-------------+----------+-----------------+-------+-----------+------------+
|              1|        34005|New Jersey|Burlington County|  805.0|      577.0|      464269|
|              2|        34023|New Jersey| Middlesex County|  311.0|     2768.0|      860807|
|              3|        34019|New Jersey| Hunterdon County|  430.0|      302.0|      129924|
|              4|        34031|New Jersey|   Passaic County|  185.0|     2801.0|      518117|
|              5|        34037|New Jersey|    Sussex County|  521.0|      279.0|      145543|
|              6|        34035|New Jersey|  Somerset County|  305.0|     1133.0|      345647|
|              7|        21041|  Kentucky|   Carroll County|  130.0|       84.0|       10863|
|              8|        12031|   Florida|     Duval County|

In [18]:
# Cargue
guardar_db(grupo_connection_string, Geografia,'Proyecto_G9_202315.Geografia', db_user, db_psswd)

### Nivel Servicio

In [6]:
#ETL Dimension Nivel Servicio

sql_NivelServ = """ (
SELECT DISTINCT 
    ROW_NUMBER() OVER () AS IdNivelDeServicio_DWH, 
    ns.IdNivelDeServicio_T,
    ns.Descripcion
FROM RaSaTransaccional_ETL.NivelesDeServicio AS ns
) AS Temp_NivelServ"""
NivelesDeServicio = obtener_dataframe_de_bd(grupo_connection_string, sql_NivelServ, db_user, db_psswd)
NivelesDeServicio.show(10)
#Este etl solo tiene en transformación la eliminación de duplicados y la creación de IDS

+---------------------+-------------------+---------------+
|IdNivelDeServicio_DWH|IdNivelDeServicio_T|    Descripcion|
+---------------------+-------------------+---------------+
|                    1|                  1|        Nivel 1|
|                    2|                  2|        Nivel 2|
|                    3|                  3|Fuera de la red|
+---------------------+-------------------+---------------+



In [42]:
# Cargue
guardar_db(grupo_connection_string, NivelesDeServicio,'Proyecto_G9_202315.NivelesDeServicio', db_user, db_psswd)

### Condiciones Pago 

In [12]:
#ETL Dimension Condiciones de Pago
sql_CondicionDePago = """ (
SELECT 
    ROW_NUMBER() OVER () AS IdCondicionDePago_DWH,
    cp.IdCondicionesDePago_T AS IdCondicionDePago_T,
    cp.Descripcion,
    CASE 
        
        WHEN cp.Tipo = 'Copagado' THEN 'Copago'
        WHEN cp.Tipo = 'Coseguridad' THEN 'Coseguro'
        ELSE cp.Tipo 
    END AS Tipo
FROM RaSaTransaccional_ETL.FuenteCondicionesDePago_ETL AS cp 
GROUP BY 
    cp.IdCondicionesDePago_T,
    cp.Descripcion,
    CASE 
        
        WHEN cp.Tipo = 'Copagado' THEN 'Copago'
        WHEN cp.Tipo = 'Coseguridad' THEN 'Coseguro'
        ELSE cp.Tipo 
    END
) AS Temp_CondPag"""

CondicionDePago = obtener_dataframe_de_bd (grupo_connection_string, sql_CondicionDePago, db_user, db_psswd)
CondicionDePago.show(10)

+---------------------+-------------------+--------------------+--------+
|IdCondicionDePago_DWH|IdCondicionDePago_T|         Descripcion|    Tipo|
+---------------------+-------------------+--------------------+--------+
|                    1|                187|Copay with deduct...|  Copago|
|                    2|                204|       Copay per Day|  Copago|
|                    3|                 45|         Coinsurance|Coseguro|
|                    4|                 85|Copay per Day bef...|  Copago|
|                    5|                 18|No Charge after d...|Coseguro|
|                    6|                 27|Coinsurance after...|Coseguro|
|                    7|                238|               Copay|  Copago|
|                    8|                207|No Charge after d...|  Copago|
|                    9|                 68|Copay per Stay af...|  Copago|
|                   10|                153|Copay after deduc...|  Copago|
+---------------------+---------------

In [13]:
# Cargue
guardar_db(grupo_connection_string, CondicionDePago,'Proyecto_G9_202315.CondicionDePago', db_user, db_psswd)

### Proveedor

In [6]:
#ETL Dimension Proveedor
sql_proveedor = """ (
SELECT 
    ROW_NUMBER() OVER () AS IdProveedor_DWH,
    p.IdProveedor_T
FROM RaSaTransaccional_ETL.FuentePlanesBeneficio_ETL AS p  
GROUP BY 
    p.IdProveedor_T
) AS Temp_proveed"""
Proveedor = obtener_dataframe_de_bd (grupo_connection_string, sql_proveedor, db_user, db_psswd)
Proveedor.show(10)
#Este etl solo tiene en transformación la eliminación de duplicados y la creación de IDS

+---------------+-------------+
|IdProveedor_DWH|IdProveedor_T|
+---------------+-------------+
|              1|        16842|
|              2|        14002|
|              3|        19722|
|              4|        81413|
|              5|        52697|
|              6|        28162|
|              7|        20129|
|              8|        40572|
|              9|        38166|
|             10|        70893|
+---------------+-------------+
only showing top 10 rows



In [7]:
# Cargue
guardar_db(grupo_connection_string, Proveedor,'Proyecto_G9_202315.Proveedor', db_user, db_psswd)

### Tipos Beneficio

In [7]:
#ETL Dimension TiposBeneficio
sql_tbenef = """ (
SELECT 
    ROW_NUMBER() OVER () AS IdTipoBeneficio_DWH,
    tbenef.IdTipoBeneficio_T,
    tbenef.Nombre
FROM RaSaTransaccional_ETL.FuenteTiposBeneficio_ETL AS tbenef
GROUP BY 
    tbenef.IdTipoBeneficio_T,
    tbenef.Nombre
) AS Temp_tbenef"""


TiposBeneficio = obtener_dataframe_de_bd (source_db_connection_string, sql_tbenef, db_user, db_psswd)
TiposBeneficio.show(10)
#Este etl solo tiene en transformación la eliminación de duplicados y la creación de IDS

+-------------------+-----------------+--------------------+
|IdTipoBeneficio_DWH|IdTipoBeneficio_T|              Nombre|
+-------------------+-----------------+--------------------+
|                  1|                5|Abortion For Whic...|
|                  2|               10|   Accidental Dental|
|                  3|               20|Adult Frames And ...|
|                  4|               25|  Allergy Injections|
|                  5|               30|     Allergy Testing|
|                  6|               40|          Anesthesia|
|                  7|               45|Anesthesia Servic...|
|                  8|               50|Applied Behavior ...|
|                  9|               60|Autism Spectrum D...|
|                 10|               65|Autism Spectrum D...|
+-------------------+-----------------+--------------------+
only showing top 10 rows



In [8]:
# Cargue
guardar_db(grupo_connection_string, TiposBeneficio,'Proyecto_G9_202315.TiposBeneficio', db_user, db_psswd)

### Mini Condiciones Tipo Beneficio 

In [9]:
#ETL Dimension MINITiposBeneficio
sql_tbenefmini = """ (
SELECT 
    ROW_NUMBER() OVER () AS IdCondicionesBeneficios_DWH,
    tbenefmini.EstaCubiertaPorSeguro,
    CASE
        WHEN tbenefmini.EsEHB = 'True' THEN 'Yes'
        WHEN tbenefmini.EsEHB = 'False' THEN 'No'
        WHEN tbenefmini.EsEHB = 'Si' THEN 'Yes'
        ELSE tbenefmini.EsEHB
    END AS EsEHB,
    CASE
        WHEN tbenefmini.TieneLimiteCuantitativo = 'True' THEN 'Yes'
        WHEN tbenefmini.TieneLimiteCuantitativo = 'False' THEN 'No'
        WHEN tbenefmini.TieneLimiteCuantitativo = 'Si' THEN 'Yes'
        ELSE tbenefmini.TieneLimiteCuantitativo
    END AS TieneLimiteCuantitativo,
    CASE
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'True' THEN 'Yes'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'False' THEN 'No'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'Si' THEN 'Yes'
        ELSE tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed
    END AS ExcluidoDelDesembolsoMaximoDentroDeLaRed,
    CASE
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'True' THEN 'Yes'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'False' THEN 'No'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'Si' THEN 'Yes'
        ELSE tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed
    END AS ExcluidoDelDesembolsoMaximoFueraDeLaRed,
    tbenefmini.UnidadDelLimite
FROM RaSaTransaccional_ETL.FuenteTiposBeneficio_ETL AS tbenefmini 
GROUP BY 
    tbenefmini.EstaCubiertaPorSeguro,
    CASE
        WHEN tbenefmini.EsEHB = 'True' THEN 'Yes'
        WHEN tbenefmini.EsEHB = 'False' THEN 'No'
        WHEN tbenefmini.EsEHB = 'Si' THEN 'Yes'
        ELSE tbenefmini.EsEHB
    END,
    CASE
        WHEN tbenefmini.TieneLimiteCuantitativo = 'True' THEN 'Yes'
        WHEN tbenefmini.TieneLimiteCuantitativo = 'False' THEN 'No'
        WHEN tbenefmini.TieneLimiteCuantitativo = 'Si' THEN 'Yes'
        ELSE tbenefmini.TieneLimiteCuantitativo
    END,
    CASE
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'True' THEN 'Yes'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'False' THEN 'No'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'Si' THEN 'Yes'
        ELSE tbenefmini.ExcluidoDelDesembolsoMaximoDentroDeLaRed
    END,
    CASE
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'True' THEN 'Yes'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'False' THEN 'No'
        WHEN tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'Si' THEN 'Yes'
        ELSE tbenefmini.ExcluidoDelDesembolsoMaximoFueraDeLaRed
    END,
    tbenefmini.UnidadDelLimite
) AS Temp_tbenef"""
MiniCondicionesTipoBeneficio = obtener_dataframe_de_bd(grupo_connection_string, sql_tbenefmini, db_user, db_psswd)
MiniCondicionesTipoBeneficio.show(10)

+---------------------------+---------------------+-----+-----------------------+----------------------------------------+---------------------------------------+-------------------+
|IdCondicionesBeneficios_DWH|EstaCubiertaPorSeguro|EsEHB|TieneLimiteCuantitativo|ExcluidoDelDesembolsoMaximoDentroDeLaRed|ExcluidoDelDesembolsoMaximoFueraDeLaRed|    UnidadDelLimite|
+---------------------------+---------------------+-----+-----------------------+----------------------------------------+---------------------------------------+-------------------+
|                          1|                   No|   No|                     No|                                      No|                                     No|                   |
|                          2|                  Yes|   No|                     No|                                      No|                                    Yes|                   |
|                          3|                  Yes|  Yes|                    Yes|    

In [10]:
# Cargue
guardar_db(grupo_connection_string, MiniCondicionesTipoBeneficio,'Proyecto_G9_202315.MiniCondicionesTipoBeneficio', db_user, db_psswd)

### Asociación Áreas de Servicio con Geografía

In [19]:
#ETL Dimension AsociacionAreaServicioGeografia
sql_asociaArserGeo = """ (
SELECT 
area_ser.IDAreaDeServicio_DWH,
geo_base.IdGeografia_DWH
FROM 
Proyecto_G9_202315.Geografia AS geo_base 
LEFT JOIN RaSaTransaccional_ETL.FuenteAreasDeServicio_ETL AS fuente_geo ON geo_base.IdGeografia_T = fuente_geo.IdGeografia_T
LEFT JOIN Proyecto_G9_202315.AreasDeServicio AS area_ser ON area_ser.IdAreaDeServicio_T = fuente_geo.IdAreaDeServicio_T
) AS AsocAreaServGeo"""
AsociacionAreaServicioGeografia = obtener_dataframe_de_bd(grupo_connection_string, sql_asociaArserGeo, db_user_2, db_psswd)
AsociacionAreaServicioGeografia.show(10)

+--------------------+---------------+
|IDAreaDeServicio_DWH|IdGeografia_DWH|
+--------------------+---------------+
|                5029|              1|
|                 961|              1|
|                3683|              1|
|                3667|              1|
|                4602|              1|
|                 992|              1|
|                4172|              1|
|                4170|              1|
|                4166|              1|
|                4161|              1|
+--------------------+---------------+
only showing top 10 rows



In [20]:
# Cargue
guardar_db(grupo_connection_string, AsociacionAreaServicioGeografia,'Proyecto_G9_202315.AsociacionAreaServicioGeografia', db_user, db_psswd)

### Hecho Planes Tipos Beneficio

In [None]:
"""
    * En este query se formatea el campo de Fecha para traerlo acorde al idFecha 
    en formato YYYYMMDD. 
    * Se cruza la información con las demás dimensiones de DWH para obtener estos 
    campos identificadores. 
    * Se toma solo lo válido en la Tabla histórica 
"""

In [14]:
#ETL Dimension HechoPlanesTiposBeneficio
sql_hechoPlanTipoBen = """ (
SELECT  
prv.IdProveedor_DWH AS IdProveedor, 
CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y%m%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y%m%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y%m%d')
    WHEN Fecha LIKE '%-%' THEN 
      DATE_FORMAT(Fecha, '%Y%m%d')
	Else Fecha 
END AS IdFechaEmision, 
mov.IdPlan_T AS IdPlan, 
ars.IdAreaDeServicio_DWH, 
nls.IdNivelDeServicio_DWH, 
tbn.IdTipoBeneficio_DWH, 
hhctb.IdCondicionesBeneficios_DWH, 
cps.IdCondicionDePago_DWH AS IdCondicionDePagoCoseguro_DWH, 
cpc.IdCondicionDePago_DWH AS IdCondicionDePagoCopago_DWH, 
mov.valorCoseguro, 
mov.valorCopago, 
mov.cantidadLimite 
FROM RaSaTransaccional_ETL.FuentePlanesBeneficio_ETL AS mov 
LEFT JOIN Proyecto_G9_202315.Proveedor AS prv ON mov.IdProveedor_T = prv.IdProveedor_T 
LEFT JOIN Proyecto_G9_202315.AreasDeServicio AS ars ON mov.IdAreaDeServicio_T = ars.IDAreaDeServicio_T 
LEFT JOIN Proyecto_G9_202315.NivelesDeServicio AS nls ON mov.IdNivelServicio_T = nls.IdNivelDeServicio_T  
LEFT JOIN Proyecto_G9_202315.TiposBeneficio AS tbn ON mov.IdTipoBeneficio_T = tbn.IdTipoBeneficio_T  
LEFT JOIN Proyecto_G9_202315.CondicionDePago AS cpc ON mov.IdCondicionDePagoCopago_T = cpc.IdCondicionDePago_T AND cpc.Tipo = 'Copago' #
LEFT JOIN Proyecto_G9_202315.CondicionDePago AS cps ON mov.IdCondicionDePagoCoseguro_T = cps.IdCondicionDePago_T AND cps.Tipo = 'Coseguro'#
LEFT JOIN Proyecto_G9_202315.HechoHistCondicionesTiposBeneficio AS hhctb ON tbn.IdTipoBeneficio_DWH = hhctb.IdTipoBeneficio_DWH AND hhctb.IdFechaFin = (SELECT MAX(IdFechaFin) FROM Proyecto_G9_202315.HechoHistCondicionesTiposBeneficio) 
) AS hechoPlanTipoBen"""
HechoPlanesTiposBeneficio = obtener_dataframe_de_bd(grupo_connection_string, sql_hechoPlanTipoBen, db_user_2, db_psswd)
HechoPlanesTiposBeneficio.show(10)

+-----------+--------------+-----------------+--------------------+---------------------+-------------------+---------------------------+-----------------------------+---------------------------+-------------+-----------+--------------+
|IdProveedor|IdFechaEmision|           IdPlan|IdAreaDeServicio_DWH|IdNivelDeServicio_DWH|IdTipoBeneficio_DWH|IdCondicionesBeneficios_DWH|IdCondicionDePagoCoseguro_DWH|IdCondicionDePagoCopago_DWH|valorCoseguro|valorCopago|cantidadLimite|
+-----------+--------------+-----------------+--------------------+---------------------+-------------------+---------------------------+-----------------------------+---------------------------+-------------+-----------+--------------+
|          1|      20171231|16842FL0070128-03|                  31|                    3|                110|                         53|                            6|                         18|           50|          0|          35.0|
|          3|      20201231|19722NM0010001-02|      

In [15]:
# Cargue
guardar_db(grupo_connection_string, HechoPlanesTiposBeneficio,'Proyecto_G9_202315.HechoPlanesTiposBeneficio', db_user, db_psswd)

### Hecho Histórico Condiciones Tipos Beneficio 

#### Paso 1: Crear y cargar la Dimensión HechoHistoricoCondicionesTiposBeneficio

In [None]:
## Borrar la tabla HechoHistCondicionesTiposBeneficio para que cargue de manera automática. 
## Se borra y vuelve a cargar para alivianar la carga de CPU, es decir, se prioriza el 
## trabajo sobre la CPU y se decide cargar nuevamente los datos en vez de correr 
## procesos grandes de corroboración de cambio de entidades 
try:
    # Conexión a la base de datos
    conexion = mysql.connector.connect(
          host="157.253.236.116",
          port="8080",
        database='Proyecto_G9_202315',
        user=db_user,
        password=db_psswd
    )

    if conexion.is_connected():
        # Crear un cursor
        cursor = conexion.cursor()

        # Sentencia SQL para borrar la tabla
        sentencia_borrar_tabla = "DROP TABLE IF EXISTS Proyecto_G9_202315.HechoHistCondicionesTiposBeneficio"

        # Ejecutar la sentencia
        cursor.execute(sentencia_borrar_tabla)
        print("La tabla ha sido borrada exitosamente.")

        # Cerrar el cursor y la conexión
        cursor.close()
        conexion.close()

except Error as e:
    print("Error durante la conexión:", e)

In [70]:
sql_condiciones_Tipo_ben = """(
SELECT  
	Dim.IdTipoBeneficio_DWH, 
    Mini.IdCondicionesBeneficios_DWH, 
    CONCAT(Base.Fecha, '-01-01') AS IdFechaInicio, 
	1 AS Cambio 
FROM 
RaSaTransaccional_ETL.FuenteTiposBeneficio_ETL AS Base 
LEFT JOIN Proyecto_G9_202315.TiposBeneficio AS Dim ON Base.IdTipoBeneficio_T = Dim.IdTipoBeneficio_T 
LEFT JOIN 
	Proyecto_G9_202315.MiniCondicionesTipoBeneficio AS Mini 
    ON 
		Mini.EsEHB = CASE
						WHEN Base.EsEHB = 'True' THEN 'Yes'
						WHEN Base.EsEHB = 'False' THEN 'No'
						WHEN Base.EsEHB = 'Si' THEN 'Yes'
						ELSE Base.EsEHB
						END
        AND Mini.EstaCubiertaPorSeguro = Base.EstaCubiertaPorSeguro 
        AND Mini.ExcluidoDelDesembolsoMaximoDentroDeLaRed = CASE
															WHEN Base.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'True' THEN 'Yes'
															WHEN Base.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'False' THEN 'No'
															WHEN Base.ExcluidoDelDesembolsoMaximoDentroDeLaRed = 'Si' THEN 'Yes'
															ELSE Base.ExcluidoDelDesembolsoMaximoDentroDeLaRed
														END 
        AND Mini.ExcluidoDelDesembolsoMaximoFueraDeLaRed = CASE
															WHEN Base.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'True' THEN 'Yes'
															WHEN Base.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'False' THEN 'No'
															WHEN Base.ExcluidoDelDesembolsoMaximoFueraDeLaRed = 'Si' THEN 'Yes'
															ELSE Base.ExcluidoDelDesembolsoMaximoFueraDeLaRed
														END
        AND Mini.TieneLimiteCuantitativo = CASE
												WHEN Base.TieneLimiteCuantitativo = 'True' THEN 'Yes'
												WHEN Base.TieneLimiteCuantitativo = 'False' THEN 'No'
												WHEN Base.TieneLimiteCuantitativo = 'Si' THEN 'Yes'
												ELSE Base.TieneLimiteCuantitativo
											END
        AND Mini.UnidadDelLimite = Base.UnidadDelLimite 
GROUP BY 
	Dim.IdTipoBeneficio_DWH, 
    Mini.IdCondicionesBeneficios_DWH, 
    CONCAT(Base.Fecha, '-01-01')  
) AS Condiciones_Tipo_ben"""
HechoHistoricoCondicionesTiposBeneficio = obtener_dataframe_de_bd(grupo_connection_string, sql_condiciones_Tipo_ben, db_user, db_psswd)

In [71]:
HechoHistoricoCondicionesTiposBeneficio.show()

+-------------------+---------------------------+-------------+------+
|IdTipoBeneficio_DWH|IdCondicionesBeneficios_DWH|IdFechaInicio|Cambio|
+-------------------+---------------------------+-------------+------+
|                  1|                          1|   2017-01-01|     1|
|                  1|                          2|   2020-01-01|     1|
|                  2|                          3|   2017-01-01|     1|
|                  2|                          4|   2019-01-01|     1|
|                  3|                          5|   2019-01-01|     1|
|                  4|                          2|   2018-01-01|     1|
|                  5|                          1|   2020-01-01|     1|
|                  6|                          6|   2017-01-01|     1|
|                  7|                          6|   2018-01-01|     1|
|                  8|                          7|   2017-01-01|     1|
|                  9|                          8|   2017-01-01|     1|
|     

In [72]:
# Convertir las fechas de string a date para poder operar con ellas
#HechoHistoricoCondicionesTiposBeneficio = HechoHistoricoCondicionesTiposBeneficio.withColumn("IdFechaInicio", f.to_date(col("IdFechaInicio"), "dd/MM/yyyy"))

# Crear especificación de ventana ordenada por IdFechaInicio de forma descendente
windowSpecDesc = Window.partitionBy("IdTipoBeneficio_DWH").orderBy(col("IdFechaInicio").desc()) 

In [73]:
# Crear especificación de ventana ordenada por IdFechaInicio de forma ascendente para obtener la fecha 'siguiente'
windowSpecAsc = Window.partitionBy("IdTipoBeneficio_DWH").orderBy(col("IdFechaInicio").asc())

In [74]:
# Usar la función lead para obtener la 'siguiente' fecha más alta dentro del mismo grupo de IdTipoBeneficio_DWH
HechoHistoricoCondicionesTiposBeneficio = HechoHistoricoCondicionesTiposBeneficio.withColumn("SiguienteFechaMax", f.lead("IdFechaInicio", 1).over(windowSpecAsc))

In [75]:
# Calcular la columna 'IdFechaFin' basada en la lógica descrita
HechoHistoricoCondicionesTiposBeneficio = HechoHistoricoCondicionesTiposBeneficio.withColumn(
    "IdFechaFin",
    f.when(
        f.col("SiguienteFechaMax").isNull(), 
        f.lit('2999-12-31')  # Si es la última fecha
    ).otherwise(
        f.expr("concat(year(SiguienteFechaMax) - 1, '-12-31')")  # Construir la fecha '31/12/yyyy' para el año siguiente
    )
)

In [76]:
# Seleccionar las columnas en el orden correcto y el formato de fecha adecuado
HechoHistoricoCondicionesTiposBeneficio = HechoHistoricoCondicionesTiposBeneficio.select(
    col("IdTipoBeneficio_DWH"),
    col("IdCondicionesBeneficios_DWH"),
    f.date_format(col("IdFechaInicio"), "yyyy-MM-dd").alias("IdFechaInicio"),
    col("IdFechaFin"), 
    col("Cambio")
)

# Ordenar y mostrar el resultado
HechoHistoricoCondicionesTiposBeneficio.orderBy("IdTipoBeneficio_DWH", "IdFechaInicio").show(truncate=False)

+-------------------+---------------------------+-------------+----------+------+
|IdTipoBeneficio_DWH|IdCondicionesBeneficios_DWH|IdFechaInicio|IdFechaFin|Cambio|
+-------------------+---------------------------+-------------+----------+------+
|1                  |1                          |2017-01-01   |2017-12-31|1     |
|1                  |10                         |2018-01-01   |2019-12-31|1     |
|1                  |2                          |2020-01-01   |2999-12-31|1     |
|2                  |3                          |2017-01-01   |2017-12-31|1     |
|2                  |92                         |2018-01-01   |2018-12-31|1     |
|2                  |4                          |2019-01-01   |2020-12-31|1     |
|2                  |26                         |2021-01-01   |2999-12-31|1     |
|3                  |5                          |2019-01-01   |2999-12-31|1     |
|4                  |2                          |2018-01-01   |2999-12-31|1     |
|5              

#### Carga inicial de la dimensión HechoHistoricoCondicionesTiposBeneficio
El cargue inicial solo se realiza una vez, mientras que el incremental se realiza en cada reporte de negocio de datos en un periodo de tiempo

In [77]:
# Cargue
guardar_db(grupo_connection_string, HechoHistoricoCondicionesTiposBeneficio,'Proyecto_G9_202315.HechoHistCondicionesTiposBeneficio', db_user, db_psswd)

### Fecha

In [21]:
## Borrar la tabla Fecha para que cargue de manera automática 
try:
    # Conexión a la base de datos
    conexion = mysql.connector.connect(
          host="157.253.236.116",
          port="8080",
        database='Proyecto_G9_202315',
        user=db_user,
        password=db_psswd
    )

    if conexion.is_connected():
        # Crear un cursor
        cursor = conexion.cursor()

        # Sentencia SQL para borrar la tabla
        sentencia_borrar_tabla = "DROP TABLE IF EXISTS Proyecto_G9_202315.Fecha"

        # Ejecutar la sentencia
        cursor.execute(sentencia_borrar_tabla)
        print("La tabla ha sido borrada exitosamente.")

        # Cerrar el cursor y la conexión
        cursor.close()
        conexion.close()

except Error as e:
    print("Error durante la conexión:", e)

La tabla ha sido borrada exitosamente.


In [22]:
sql_fecha = """(
SELECT 
CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y%m%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y%m%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y%m%d')
    WHEN Fecha LIKE '%-%' THEN 
      DATE_FORMAT(Fecha, '%Y%m%d')
	Else Fecha 
  END AS ID_Fecha, 
CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d') 
    WHEN Fecha LIKE '%-%' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d') 
	Else Fecha 
  END AS Fecha, 
  DAY(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END) AS Dia, 
  MONTH(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END) AS Mes, 
  YEAR(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END) AS Anioa, 
  WEEK(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END, 3) AS Numero_semana_ISO    
FROM (
SELECT DISTINCT Fecha FROM RaSaTransaccional_ETL.FuentePlanesBeneficio_ETL 
UNION ALL 
SELECT DISTINCT IdFechaInicio FROM Proyecto_G9_202315.HechoHistCondicionesTiposBeneficio 
UNION ALL 
SELECT DISTINCT IdFechaFin FROM Proyecto_G9_202315.HechoHistCondicionesTiposBeneficio 
) AS Fechas 
GROUP BY 
CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y%m%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y%m%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y%m%d') 
    WHEN Fecha LIKE '%-%' THEN 
      DATE_FORMAT(Fecha, '%Y%m%d')
	Else Fecha 
  END, 
CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d') 
    WHEN Fecha LIKE '%-%' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d') 
	Else Fecha 
  END, 
  DAY(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END), 
  MONTH(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END), 
  YEAR(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END), 
  WEEK(CASE 
	WHEN Fecha LIKE '%,%' THEN
      DATE_FORMAT(STR_TO_DATE(Fecha, '%b %d,%Y'), '%Y/%m/%d')
    WHEN Fecha LIKE '%/%' THEN 
      DATE_FORMAT(STR_TO_DATE(Fecha, '%d/%m/%Y %H:%i:%s'), '%Y/%m/%d')
    WHEN Fecha LIKE '% %' THEN 
      DATE_FORMAT(Fecha, '%Y/%m/%d')
	Else Fecha 
  END, 3)  
) AS Temp_fechas"""
fechas = obtener_dataframe_de_bd(grupo_connection_string, sql_fecha, db_user, db_psswd)
fechas.show(10)

+--------+----------+---+---+-----+-----------------+
|ID_Fecha|     Fecha|Dia|Mes|Anioa|Numero_semana_ISO|
+--------+----------+---+---+-----+-----------------+
|20171231|2017/12/31| 31| 12| 2017|               52|
|20191231|2019/12/31| 31| 12| 2019|                1|
|20201231|2020/12/31| 31| 12| 2020|               53|
|20211231|2021/12/31| 31| 12| 2021|               52|
|20181231|2018/12/31| 31| 12| 2018|                1|
|20170101|2017/01/01|  1|  1| 2017|               52|
|20180101|2018/01/01|  1|  1| 2018|                1|
|20200101|2020/01/01|  1|  1| 2020|                1|
|20190101|2019/01/01|  1|  1| 2019|                1|
|20210101|2021/01/01|  1|  1| 2021|               53|
+--------+----------+---+---+-----+-----------------+
only showing top 10 rows



In [23]:
# CARGUE
guardar_db(grupo_connection_string, fechas,'Proyecto_G9_202315.Fecha', db_user, db_psswd)

## 4. Cierre
Completado este tutorial, sabe cómo configurar y realizar ETLs con historia en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

Para saber más sobre las técnicas de manejo de historia, consulte el libro <i>The  Data Warehouse Toolkit</i> de Ralph Kimball y Margy Ross,que podrá encontrar en la biblioteca de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.