# ETL: Carga Areas de Servicio

In [74]:
# Imports 
from pyspark.sql.types import IntegerType, StringType, DateType, LongType, DoubleType
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import col, length, when, floor, monotonically_increasing_id, lit, round



In [4]:
class MySQLConnector:
    def __init__(self, spark: SparkSession, connection_properties: dict, url: str):
        self.spark = spark
        self.properties = connection_properties
        self.url = url

    def get_dataframe(self, sql_query: str):        
        df = self.spark.read.jdbc(
            url=self.url,
            table=sql_query,
            properties=self.properties
        )
        return df
    
    def save_db(self, df, tabla):
        df.write.jdbc(
            url=self.url,
            table=tabla,
            mode='append',
            properties=self.properties
        )
        
def create_spark_session(path_jar_driver):    
    conf = SparkConf().set('spark.driver.extraClassPath', path_jar_driver)
    spark_context = SparkContext(conf=conf)
    sql_context = SQLContext(spark_context)
    return sql_context.sparkSession    

def get_dataframe_from_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

In [5]:

db_user = 'Estudiante_65_202415'
db_psswd = 'Estudiante_202010409'

connection_properties = {
    "user": db_user,
    "password": db_psswd,
    "driver": "com.mysql.cj.jdbc.Driver"
}

source_db_string_connection = 'jdbc:mysql://157.253.236.120:8080/RaSaTransaccional_ETL'
destination_db_string_connection = f'jdbc:mysql://157.253.236.120:8080/{db_user}'

# Driver de conexion
# LINUX
path_jar_driver = '/opt/mysql/lib/mysql-connector-java-8.0.28.jar'
# WINDOWS
#path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [6]:
spark = create_spark_session(path_jar_driver)

24/11/18 10:39:47 WARN Utils: Your hostname, willp resolves to a loopback address: 127.0.1.1; using 192.168.1.113 instead (on interface wlp9s0)
24/11/18 10:39:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/18 10:39:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
conn_orig = MySQLConnector(spark=spark, connection_properties=connection_properties, url=source_db_string_connection)
conn_dest = MySQLConnector(spark=spark, connection_properties=connection_properties, url=destination_db_string_connection)

## Proceso de ETL para una dimensión.

![Modelo Movimientos](./images/AreasDeServicio.png)

## Cargar FuenteAreasDeServicio_ETL

# Areas de Servicio

### Extracción de datos de AreasDeServicio

Con el distinct se evitan los duplicados

In [99]:
# Extracción de todos los campos de FuenteAreasDeServicio_ETL
sql_areas_servicio_all_fields = '''
(
SELECT 
    DISTINCT IdAreaDeServicio_T AS IdAreaDeServicio_T, 
    NombreAreaDeServicio AS Nombre,
    CONCAT(Fecha, '-01-01') AS AnnioCreacion
 FROM FuenteAreasDeServicio_ETL
 ORDER BY IdAreaDeServicio_T) AS AreasDeServicio
'''
df_areas_servicio = conn_orig.get_dataframe(sql_areas_servicio_all_fields)
df_areas_servicio.printSchema()

root
 |-- IdAreaDeServicio_T: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- AnnioCreacion: string (nullable = true)



### Transformación 

#### Cast types 

In [100]:
df_areas_servicio = df_areas_servicio \
    .withColumn('IdAreaDeServicio_T', col('IdAreaDeServicio_T').cast(IntegerType())) \
    .withColumn('Nombre', col('Nombre').cast(StringType())) \
    .withColumn('AnnioCreacion', col('AnnioCreacion').cast(DateType()))
df_areas_servicio.show(3)


[Stage 323:>                                                        (0 + 1) / 1]

+------------------+--------------------+-------------+
|IdAreaDeServicio_T|              Nombre|AnnioCreacion|
+------------------+--------------------+-------------+
|             12018|NoVa10207VA038000...|   2018-01-01|
|             22019|NoVA10207VA038000...|   2019-01-01|
|             32019|NoVA10207VA038000...|   2019-01-01|
+------------------+--------------------+-------------+
only showing top 3 rows



                                                                                

#### Crear un identificador único y consecutivo 

In [101]:
df_areas_servicio = df_areas_servicio.withColumn("IdAreaDeServicio_DWH", monotonically_increasing_id() + 1)
df_areas_servicio.show(3)


+------------------+--------------------+-------------+--------------------+
|IdAreaDeServicio_T|              Nombre|AnnioCreacion|IdAreaDeServicio_DWH|
+------------------+--------------------+-------------+--------------------+
|             12018|NoVa10207VA038000...|   2018-01-01|                   1|
|             22019|NoVA10207VA038000...|   2019-01-01|                   2|
|             32019|NoVA10207VA038000...|   2019-01-01|                   3|
+------------------+--------------------+-------------+--------------------+
only showing top 3 rows



                                                                                

#### Adicionar comodin para evitar referencias nulas.

In [102]:
data = [(0, "Missing", "3000-01-01", 0)]
columns = ["IdAreaDeServicio_T", "Nombre", "AnnioCreacion", "IdAreaDeServicio_DWH"]
dummy_areas_servicio = spark.createDataFrame(data, columns)

df_areas_servicio = dummy_areas_servicio.union(df_areas_servicio)
df_areas_servicio.show(3)




+------------------+--------------------+-------------+--------------------+
|IdAreaDeServicio_T|              Nombre|AnnioCreacion|IdAreaDeServicio_DWH|
+------------------+--------------------+-------------+--------------------+
|                 0|             Missing|   3000-01-01|                   0|
|             12018|NoVa10207VA038000...|   2018-01-01|                   1|
|             22019|NoVA10207VA038000...|   2019-01-01|                   2|
+------------------+--------------------+-------------+--------------------+
only showing top 3 rows



                                                                                

In [61]:
df_areas_servicio.show(3)



+------------------+--------------------+-------------+--------------------+
|IdAreaDeServicio_T|              Nombre|AnnioCreacion|IdAreaDeServicio_DWH|
+------------------+--------------------+-------------+--------------------+
|                 0|             Missing|   3000-01-01|                   0|
|             12018|NoVa10207VA038000...|   2018-01-01|                   1|
|             22019|NoVA10207VA038000...|   2019-01-01|                   2|
+------------------+--------------------+-------------+--------------------+
only showing top 3 rows



                                                                                

In [103]:
df_areas_servicio.count()

                                                                                

11621

# Cargar los datos transformados

In [65]:
# Carga de los datos transformados
conn_dest.save_db(df_areas_servicio, 'Rs_AreasDeServicio')

                                                                                

# Geografía 

### Extracción de datos de Geografia

Con SQL se ajusta los valores de la poblacion altos.

In [78]:
# Extracción de datos de Geografia
sql_geografia = '''
(
SELECT DISTINCT IdGeografia_T,
                Estado,
                Condado,
                ABS(Area) AS AreaAct,
                CASE
                    WHEN PoblacionAct % 10000 = 1 THEN FLOOR(ABS(PoblacionAct) / 10000)
                    ELSE PoblacionAct
                    END AS PoblacionAct
FROM FuenteAreasDeServicio_ETL
ORDER BY IdGeografia_T) AS Geografia
'''
df_geografia = conn_orig.get_dataframe(sql_geografia)
df_geografia = df_geografia.withColumn('IdGeografia_T', col('IdGeografia_T').cast(IntegerType())) \
    .withColumn('PoblacionAct', col('PoblacionAct').cast(DoubleType()))
 
df_geografia.printSchema()



root
 |-- IdGeografia_T: integer (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Condado: string (nullable = true)
 |-- AreaAct: double (nullable = true)
 |-- PoblacionAct: double (nullable = true)



## Transformation

### 1. Crear un identificador único y consecutivo 

In [79]:
df_geografia = df_geografia.withColumn("IdGeografia_DWH", monotonically_increasing_id() + 1)
df_geografia.show(3, truncate=False)


+-------------+-------+--------------+-------+------------+---------------+
|IdGeografia_T|Estado |Condado       |AreaAct|PoblacionAct|IdGeografia_DWH|
+-------------+-------+--------------+-------+------------+---------------+
|1001         |Alabama|Autauga County|594.0  |59095.0     |1              |
|1003         |Alabama|Baldwin County|1589.0 |239294.0    |2              |
|1005         |Alabama|Barbour County|884.0  |24964.0     |3              |
+-------------+-------+--------------+-------+------------+---------------+
only showing top 3 rows



                                                                                

### 3. Errores en la población 

Se valida de nuevo que no existan valores excesivamente altos

In [80]:
df_geografia.filter(col("PoblacionAct") > 10000000).show(10)

[Stage 234:>                                                        (0 + 1) / 1]

+-------------+------+-------+-------+------------+---------------+
|IdGeografia_T|Estado|Condado|AreaAct|PoblacionAct|IdGeografia_DWH|
+-------------+------+-------+-------+------------+---------------+
+-------------+------+-------+-------+------------+---------------+



                                                                                

In [81]:
df_invalid_poblacion = df_geografia.filter(col("PoblacionAct").like("%0001"))

df_invalid_poblacion.show(truncate=False)

count_invalid = df_invalid_poblacion.count()
if count_invalid > 0:
    print(f"Se encontraron {count_invalid} registros con PoblacionAct terminando en '0001'.")
else:
    print("No se encontraron registros con PoblacionAct terminando en '0001'.")


                                                                                

+-------------+------+-------+-------+------------+---------------+
|IdGeografia_T|Estado|Condado|AreaAct|PoblacionAct|IdGeografia_DWH|
+-------------+------+-------+-------+------------+---------------+
+-------------+------+-------+-------+------------+---------------+



[Stage 236:>                                                        (0 + 1) / 1]

No se encontraron registros con PoblacionAct terminando en '0001'.


                                                                                

### Adicionar el comodin 

In [82]:
data = [(0, "Missing", "Missing", 0.0, 0.0, 0)]
columns = ["IdGeografia_T", "Estado", "Condado", "AreaAct", "PoblacionAct", "IdGeografia_DWH"]
dummy_df_geografia = spark.createDataFrame(data, columns)

df_geografia = dummy_df_geografia.union(df_geografia)
df_geografia.show(3)

+-------------+-------+--------------+-------+------------+---------------+
|IdGeografia_T| Estado|       Condado|AreaAct|PoblacionAct|IdGeografia_DWH|
+-------------+-------+--------------+-------+------------+---------------+
|            0|Missing|       Missing|    0.0|         0.0|              0|
|         1001|Alabama|Autauga County|  594.0|     59095.0|              1|
|         1003|Alabama|Baldwin County| 1589.0|    239294.0|              2|
+-------------+-------+--------------+-------+------------+---------------+
only showing top 3 rows



                                                                                

Adicionar Densidad a partir del calculo de dos columnas para corroborar su valor


### 4. Completar los campos nulos de poblacion y area

Verificar si existen valores nulos en las columnas relevantes 

In [83]:

columns_to_check = ["IdGeografia_T", "Estado", "Condado", "AreaAct", "IdGeografia_DWH"]

for col_name in columns_to_check:
    null_count = df_geografia.filter(col(col_name).isNull()).count()
    if null_count > 0:
        print(f"Se encontraron {null_count} valores nulos en la columna {col_name}.")
    else:
        print(f"No se encontraron valores nulos en la columna {col_name}.")


                                                                                

No se encontraron valores nulos en la columna IdGeografia_T.
No se encontraron valores nulos en la columna Estado.
No se encontraron valores nulos en la columna Condado.
Se encontraron 31 valores nulos en la columna AreaAct.
No se encontraron valores nulos en la columna IdGeografia_DWH.


### Garantizar completitud de las poblaciones
Se garantiza la completitud de las poblaciones y areas nulas, primero se hace un query en la base de datos, para obtener los condados con dichos valores vacios, como son pocos se completa la informacion con wikipedia.

In [84]:
data = [
    (51510, "Virginia", "Alexandria City", 15.03, 159200),
    (51650, "Virginia", "Hampton City", 51.41, 137148),
    (51735, "Virginia", "Poquoson City", 15.5, 12150),
    (51830, "Virginia", "Williamsburg City", 9.1, 15000),
    (51710, "Virginia", "Norfolk City", 54.0, 238005),
    (51700, "Virginia", "Newport News City", 69.2, 179225),
    (51670, "Virginia", "Hopewell City", 10.2, 22529),
    (51570, "Virginia", "Colonial Heights City", 7.5, 17411),
    (51730, "Virginia", "Petersburg City", 23.2, 30587),
    (51760, "Virginia", "Richmond City", 62.5, 226610),
    (51810, "Virginia", "Virginia Beach City", 249.0, 450189),
    (51550, "Virginia", "Chesapeake City", 340.7, 249422),
    (51610, "Virginia", "Falls Church City", 2.0, 14617),
    (51685, "Virginia", "Manassas Park City", 2.5, 16800),
    (51600, "Virginia", "Fairfax City", 6.3, 24019),
    (51740, "Virginia", "Portsmouth City", 33.6, 94698),
    (51620, "Virginia", "Franklin City", 8.3, 8000),
    (51630, "Virginia", "Fredericksburg City", 10.5, 29036),
    (51678, "Virginia", "Lexington City", 2.5, 7200),
    (51683, "Virginia", "Manassas City", 10.0, 41764),
    (51540, "Virginia", "Charlottesville City", 10.3, 47146),
    (51790, "Virginia", "Staunton City", 20.0, 24932),
    (51800, "Virginia", "Suffolk City", 400.0, 94586),
    (51660, "Virginia", "Harrisonburg City", 17.4, 54033),
    (51515, "Virginia", "Bedford", 6.9, 6500),
    (51680, "Virginia", "Lynchburg City", 49.6, 82000),
    (51595, "Virginia", "Emporia City", 7.0, 5400),
    (51530, "Virginia", "Buena Vista City", 6.8, 6000),
    (51580, "Virginia", "Covington City", 5.7, 5700),
    (51820, "Virginia", "Waynesboro City", 15.4, 22000),
    (51840, "Virginia", "Winchester City", 9.3, 28078),
]

columns = ["IdGeografia_T", "State", "Country", "Area", "Population"]

df_missing_countries = spark.createDataFrame(data, columns)


In [85]:
df_geografia = df_geografia.join(df_missing_countries, how = 'left', on = 'IdGeografia_T')

# Probar para algunos IDs que fueron identificados como nulos
df_geografia.filter(col("IdGeografia_T").isin(51650, 51510, 51735, 51700)).show()

+-------------+--------+-----------------+-------+------------+---------------+--------+-----------------+-----+----------+
|IdGeografia_T|  Estado|          Condado|AreaAct|PoblacionAct|IdGeografia_DWH|   State|          Country| Area|Population|
+-------------+--------+-----------------+-------+------------+---------------+--------+-----------------+-----+----------+
|        51700|Virginia|Newport News City|   null|    179582.0|           2485|Virginia|Newport News City| 69.2|    179225|
|        51650|Virginia|     Hampton City|   null|    135169.0|           2478|Virginia|     Hampton City|51.41|    137148|
|        51510|Virginia|  Alexandria City|   null|    158309.0|           2466|Virginia|  Alexandria City|15.03|    159200|
|        51735|Virginia|    Poquoson City|   null|     12121.0|           2488|Virginia|    Poquoson City| 15.5|     12150|
+-------------+--------+-----------------+-------+------------+---------------+--------+-----------------+-----+----------+



                                                                                

In [86]:
df_geografia = df_geografia.withColumn("AreaAct", when(col("AreaAct").isNull(), 
                                                                 col("Area")).otherwise(col("AreaAct")))

df_geografia.filter(col("IdGeografia_T").isin(51650, 51510, 51735, 51700)).show()


+-------------+--------+-----------------+-------+------------+---------------+--------+-----------------+-----+----------+
|IdGeografia_T|  Estado|          Condado|AreaAct|PoblacionAct|IdGeografia_DWH|   State|          Country| Area|Population|
+-------------+--------+-----------------+-------+------------+---------------+--------+-----------------+-----+----------+
|        51700|Virginia|Newport News City|   69.2|    179582.0|           2485|Virginia|Newport News City| 69.2|    179225|
|        51650|Virginia|     Hampton City|  51.41|    135169.0|           2478|Virginia|     Hampton City|51.41|    137148|
|        51510|Virginia|  Alexandria City|  15.03|    158309.0|           2466|Virginia|  Alexandria City|15.03|    159200|
|        51735|Virginia|    Poquoson City|   15.5|     12121.0|           2488|Virginia|    Poquoson City| 15.5|     12150|
+-------------+--------+-----------------+-------+------------+---------------+--------+-----------------+-----+----------+



                                                                                

In [87]:
columns_to_drop = ["State", "Country", "Area", "Population"]

df_geografia = df_geografia.drop(*columns_to_drop)
df_geografia.show(3)


+-------------+-------+--------------+-------+------------+---------------+
|IdGeografia_T| Estado|       Condado|AreaAct|PoblacionAct|IdGeografia_DWH|
+-------------+-------+--------------+-------+------------+---------------+
|            0|Missing|       Missing|    0.0|         0.0|              0|
|         1007|Alabama|   Bibb County|  622.0|     22477.0|              4|
|         1005|Alabama|Barbour County|  884.0|     24964.0|              3|
+-------------+-------+--------------+-------+------------+---------------+
only showing top 3 rows



                                                                                

### Adicionar la Densidad
Al ser este un campo calculado , se recomienda al negocio no persistir estos datos, es una mala practica, se puede crear vista o otras alternativas.

In [88]:
df_geografia = df_geografia.withColumn("DensidadAct", round(col("PoblacionAct") / col("AreaAct"), 3))
df_geografia.show(3)


+-------------+--------+--------------+-------+------------+---------------+-----------+
|IdGeografia_T|  Estado|       Condado|AreaAct|PoblacionAct|IdGeografia_DWH|DensidadAct|
+-------------+--------+--------------+-------+------------+---------------+-----------+
|            0| Missing|       Missing|    0.0|         0.0|              0|       null|
|        17043|Illinois| DuPage County|  327.0|    924885.0|            432|   2828.394|
|        18147| Indiana|Spencer County|  397.0|     19798.0|            586|     49.869|
+-------------+--------+--------------+-------+------------+---------------+-----------+
only showing top 3 rows



                                                                                

Cambiar valor de la densidad del comodin 



In [89]:
df_geografia = df_geografia.withColumn("DensidadAct",
    when(col("IdGeografia_T") == 0, 0.0).otherwise(col("DensidadAct")))

df_geografia.show(3)



+-------------+--------+--------------+-------+------------+---------------+-----------+
|IdGeografia_T|  Estado|       Condado|AreaAct|PoblacionAct|IdGeografia_DWH|DensidadAct|
+-------------+--------+--------------+-------+------------+---------------+-----------+
|            0| Missing|       Missing|    0.0|         0.0|              0|        0.0|
|        17043|Illinois| DuPage County|  327.0|    924885.0|            432|   2828.394|
|        18147| Indiana|Spencer County|  397.0|     19798.0|            586|     49.869|
+-------------+--------+--------------+-------+------------+---------------+-----------+
only showing top 3 rows



                                                                                

## LOAD - Geografia

In [91]:
df_geografia = df_geografia.withColumn('IdGeografia_DWH',col('IdGeografia_DWH').cast('int')).orderBy(col('IdGeografia_DWH'))

df_geografia.printSchema()
df_geografia.show(5)
df_geografia.count()

root
 |-- IdGeografia_T: long (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Condado: string (nullable = true)
 |-- AreaAct: double (nullable = true)
 |-- PoblacionAct: double (nullable = true)
 |-- IdGeografia_DWH: integer (nullable = true)
 |-- DensidadAct: double (nullable = true)

+-------------+-------+--------------+-------+------------+---------------+-----------+
|IdGeografia_T| Estado|       Condado|AreaAct|PoblacionAct|IdGeografia_DWH|DensidadAct|
+-------------+-------+--------------+-------+------------+---------------+-----------+
|            0|Missing|       Missing|    0.0|         0.0|              0|        0.0|
|         1001|Alabama|Autauga County|  594.0|     59095.0|              1|     99.487|
|         1003|Alabama|Baldwin County| 1589.0|    239294.0|              2|    150.594|
|         1005|Alabama|Barbour County|  884.0|     24964.0|              3|      28.24|
|         1007|Alabama|   Bibb County|  622.0|     22477.0|              4|     36.1

2647

In [93]:
conn_dest.save_db(df_geografia, 'Rs_Geografia')

                                                                                

# AsociacionAreaServicioGeografia

### Extracción de datos de AsociacionAreaServicioGeografia

In [108]:
# Extracción desde Rs_AreasDeServicio
sql_areas_servicio_geografia = '''
(
SELECT DISTINCT IdAreaDeServicio_T,
                IdGeografia_T               
FROM FuenteAreasDeServicio_ETL
WHERE IdAreaDeServicio_T IS NOT NULL AND IdGeografia_T IS NOT NULL
GROUP BY IdAreaDeServicio_T, IdGeografia_T, NombreAreaDeServicio, Condado, Estado
ORDER BY IdAreaDeServicio_T, IdGeografia_T
) AS AreasDeServicioIds
'''
df_areas_servicio_source = conn_orig.get_dataframe(sql_areas_servicio_geografia)
df_areas_servicio_source = df_areas_servicio_source.withColumn('IdAreaDeServicio_T', col('IdAreaDeServicio_T').cast(IntegerType())) \
    .withColumn('IdGeografia_T', col('IdGeografia_T').cast(IntegerType()))
df_areas_servicio_source.printSchema()

root
 |-- IdAreaDeServicio_T: integer (nullable = true)
 |-- IdGeografia_T: integer (nullable = true)



In [107]:
# Filtrar por las dos tablas anteriores
# Correcting the selection of specific columns
df_geografia_filtered_ids = df_geografia.select("IdGeografia_T", "IdGeografia_DWH")
df_areas_servicio_filtered_ids = df_areas_servicio.select("IdAreaDeServicio_T", "IdAreaDeServicio_DWH")

# Display the results
df_geografia_filtered_ids.show(3)
df_areas_servicio_filtered_ids.show(3)

+-------------+---------------+
|IdGeografia_T|IdGeografia_DWH|
+-------------+---------------+
|            0|              0|
|         1001|              1|
|         1003|              2|
+-------------+---------------+
only showing top 3 rows





+------------------+--------------------+
|IdAreaDeServicio_T|IdAreaDeServicio_DWH|
+------------------+--------------------+
|                 0|                   0|
|             12018|                   1|
|             22019|                   2|
+------------------+--------------------+
only showing top 3 rows



                                                                                

In [118]:
areas_servicio_ids = (df_areas_servicio_source.join(df_geografia_filtered_ids, how = 'left', on = 'IdGeografia_T').join(df_areas_servicio_filtered_ids, how = 'left', on='IdAreaDeServicio_T')
                      .select([col('IdAreaDeServicio_DWH'),col('IdGeografia_DWH')])
                      .fillna({'IdAreaDeServicio_DWH': 0, 'IdGeografia_DWH': 0}))
areas_servicio_ids.count()


                                                                                

268444

In [119]:
areas_servicio_ids.printSchema()

root
 |-- IdAreaDeServicio_DWH: long (nullable = false)
 |-- IdGeografia_DWH: integer (nullable = false)



In [120]:
areas_servicio_ids = areas_servicio_ids.withColumn('IdAreaDeServicio_DWH', col('IdAreaDeServicio_DWH').cast(IntegerType())) \
    .withColumn('IdGeografia_DWH', col('IdGeografia_DWH').cast(IntegerType()))

In [121]:
areas_servicio_ids.printSchema()

root
 |-- IdAreaDeServicio_DWH: integer (nullable = false)
 |-- IdGeografia_DWH: integer (nullable = false)



In [123]:
areas_servicio_ids = areas_servicio_ids.repartition(4)
conn_dest.save_db(areas_servicio_ids, 'Rs_AsociacionAreaServicioGeografia')

                                                                                