### ETL para TipoVuelo y HechoHistoriaCambios
_Roberto Carreño_

In [1]:
db_user = 'Estudiante_11'
db_psswd = 'RASKV6238W'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/ProyectoTransaccional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_11'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType
from pyspark.sql.functions import udf, col, lit, length, isnan, when, count, year, date_format, to_date, unix_timestamp, from_unixtime
import pyspark.sql.functions as f
import os 
from datetime import datetime
from pyspark.sql import types as t
from pandas_profiling import ProfileReport
#import matplotlib.pyplot as plt
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



In [4]:
def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

## Dimensión TipoVuelo

#### Extracción
Para obtener los tipos de vuelos existentes, extraemos la información de la tabla **vuelos** haciendo un _distinct_ en la consulta sobre la columna **_tipo_vuelo_**

In [5]:
sql_tipovuelos = '''(SELECT DISTINCT tipo_vuelo FROM ProyectoTransaccional.vuelos) AS Temp_tipovuelos'''
tipovuelos = obtener_dataframe_de_bd(source_db_connection_string, sql_tipovuelos, db_user, db_psswd)
tipovuelos.show()

+----------+
|tipo_vuelo|
+----------+
|         R|
|         C|
|         T|
|         A|
+----------+



#### Transfomación
Una vez obtenida la información, procedemos a generar el id autoincremental de DWH y hacemos la conversión de los tipos a los nombres según indicó el negocio en las aclaraciones para esta tabla de la siguiente manera: **R**= regular, **T**= taxi, **C**= chárter, **A**= adicionales

In [6]:
tipovuelos = tipovuelos.withColumn('idTipoVuelo_DWH', f.monotonically_increasing_id() + 1)
tipovuelos = tipovuelos.withColumn("NombreTipo", when(col("tipo_vuelo") == 'C', lit("Charter")).otherwise(when(col("tipo_vuelo") == 'R', lit("Regular")).otherwise(when(col("tipo_vuelo") == 'T', lit("Taxi")).otherwise(when(col("tipo_vuelo") == 'A', lit("Adicionales")).otherwise(tipovuelos.tipo_vuelo)))))
tipovuelos.show()

+----------+---------------+-----------+
|tipo_vuelo|idTipoVuelo_DWH| NombreTipo|
+----------+---------------+-----------+
|         R|              1|    Regular|
|         C|              2|    Charter|
|         T|              3|       Taxi|
|         A|              4|Adicionales|
+----------+---------------+-----------+



Ahora, dado que el modelo propuesto tiene las culumnas _idTipoVuelo_DWH_ y _NombreTipo_ para la dimensión **TipoVuelo**, lo siguiente a realizar será eliminar el campo tipo_vuelo del Data Frame y con ello obtenemos lo solicitado:

In [7]:
tipovuelos = tipovuelos.drop('tipo_vuelo')
tipovuelos.show()

+---------------+-----------+
|idTipoVuelo_DWH| NombreTipo|
+---------------+-----------+
|              1|    Regular|
|              2|    Charter|
|              3|       Taxi|
|              4|Adicionales|
+---------------+-----------+



#### Cargue
Por ultimo, procedemos a cargar la dimensión.

In [8]:
guardar_db(dest_db_connection_string, tipovuelos,'Estudiante_11.TipoVuelo', db_user, db_psswd)

## HechoHistoriaCambios

Como se aclara en el enunciado, se procede a efectuar el ETL para registrar la historia de cambios a aeropuertos por medio de la tabla HistoriaCambios. Esto se hará por medio de los siguientes pasos:

### Crear y cargar la Dimensión HistoriaCambios
**Extracción**

In [9]:
sql_aeropuertos = '''(SELECT * FROM Estudiante_11.Aeropuerto) AS Temp_productos'''
aeropuertos = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuertos, db_user, db_psswd)

**Transformación**

In [10]:
aeropuertos = aeropuertos.withColumn('idMini_DWH',lit(1))
aeropuertos = aeropuertos.withColumn('fechaInicio',lit('2004-01-01'))
aeropuertos = aeropuertos.withColumn('fechaFin',lit('2199-12-31'))
aeropuertos = aeropuertos.withColumn('cambio', lit(1))
aeropuertos.show()

+-----+--------------+---------+----------------+----------+-----------+----------+------+
|sigla|        nombre|elevacion|idAeropuerto_DWH|idMini_DWH|fechaInicio|  fechaFin|cambio|
+-----+--------------+---------+----------------+----------+-----------+----------+------+
|  ypp|german alberto|    735.0|               1|         1| 2004-01-01|2199-12-31|     1|
|  7hs| castillo - ca|     20.0|               2|         1| 2004-01-01|2199-12-31|     1|
|  9ax|    santa cruz|    659.0|               3|         1| 2004-01-01|2199-12-31|     1|
|  a03|        fortul|    730.0|               4|         1| 2004-01-01|2199-12-31|     1|
|  aci| el monasterio|    948.0|               5|         1| 2004-01-01|2199-12-31|     1|
+-----+--------------+---------+----------------+----------+-----------+----------+------+



Eliminamos duplicados:

In [11]:
aeropuertos = aeropuertos.drop_duplicates()

Postoriormente, eliminamos las columnas no requeridas en la dimensión:

In [12]:
aeropuertos = aeropuertos.drop('sigla')
aeropuertos = aeropuertos.drop('nombre')
aeropuertos = aeropuertos.drop('elevacion')
aeropuertos.show()

+----------------+----------+-----------+----------+------+
|idAeropuerto_DWH|idMini_DWH|fechaInicio|  fechaFin|cambio|
+----------------+----------+-----------+----------+------+
|               2|         1| 2004-01-01|2199-12-31|     1|
|               3|         1| 2004-01-01|2199-12-31|     1|
|               1|         1| 2004-01-01|2199-12-31|     1|
|               4|         1| 2004-01-01|2199-12-31|     1|
|               5|         1| 2004-01-01|2199-12-31|     1|
+----------------+----------+-----------+----------+------+



**Cargue inicial de historia aeropuertos**

In [13]:
# CARGUE
guardar_db(dest_db_connection_string, aeropuertos,'Estudiante_11.HechoHistoriaCambios', db_user, db_psswd)

Como el negocio no ha compartido reporte mensual dejamos el cargue inicial de HechoHistoriaCambios con la información original de la dimensión aeropuertos sin simular cambios en la tabla factless HechoHistoriaCambios.