# MIAD modelado de datos y ETL - Primer ETL

In [38]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, DoubleType
from pyspark.sql.functions import col,isnan, when, count
from datetime import datetime
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_style()
pd.set_option("display.max_columns", None)

In [2]:
import findspark
findspark.init()
findspark.find()

'D:\\spark\\spark-3.1.2-bin-hadoop2.7'

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[2] pyspark-shell'

In [4]:
#Configuración de la sesión

appName = "PySpark SQL/MYSQL - via JDBC"
master = "local"
conf = SparkConf()\
    .setAppName(appName)\
    .setMaster(master)\
    .set("spark.driver.extraClassPath","C:\dev\sqljdbc_9.4\enu\mssql-jdbc-9.4.0.jre8.jar;C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.26.jar")
spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

In [5]:
spark

In [8]:
# Configuración servidor base de datos multidimensional
user_md = 'user'
psswd_md = 'rw,.12a'
db_multidimensional_connection_string = "jdbc:sqlserver://LAPTOP-RRMH1QSM:1433;databaseName=infra_visible_2"

### ETL
Funciones para la extracción, transformación y guardado de datos.

In [89]:
# Extraer data from CSV
PATH = "./data/"

def extract_aeropuertos(spark):
    return spark.read.load(PATH + "aeropuertos_cambios_infraestructura.csv",format="csv", sep=",", inferSchema="true", header="true")

def extract_vuelos(spark):
    return spark.read.load(PATH + "vuelosEtapa2.csv",format="csv", sep=",", inferSchema="true", header="true")

def extract_otros(spark):
    df = spark.read.load(PATH + "vuelosEtapa2.csv",format="csv", sep=",", inferSchema="true", header="true")
    empresa_df = df.select("empresa").distinct()
    equipo_df = df.select("tipo_equipo").distinct().withColumnRenamed("tipo_equipo", "equipo")
    tipo_vuelotrafico_df = df.select('trafico','tipo_vuelo').distinct().withColumnRenamed("tipo_vuelo", "vuelo")
    return empresa_df, equipo_df, tipo_vuelotrafico_df

def extract_cobertura(spark):
    return spark.read.load(PATH + "cobertura_centro.csv",format="csv", sep=",", inferSchema="true", header="true")


# SQL server
# Necesito leer las dimensiones fecha y aeropuertos para extraer sus llaves primarias y asignarlas a la tabla de hechos.
def extract_aeropuertos_from_multidimensional(spark):
    return  spark.read.format("jdbc")\
        .option("url", db_multidimensional_connection_string)\
        .option("dbtable", "aeropuertos")\
        .option("user", user_md)\
        .option("password", psswd_md)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
        .load().select("id", "sigla", "fecha_inicio", "fecha_fin", "vigente")

def extract_fecha_from_multidimensional(spark):
    df = spark.read.format("jdbc")\
        .option("url", db_multidimensional_connection_string)\
        .option("dbtable", "fecha")\
        .option("user", user_md)\
        .option("password", psswd_md)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
        .load()
    return df.select("id", "year", "month", "day")


def extract_empresa_from_multidimensional(spark):
    df = spark.read.format("jdbc")\
        .option("url", db_multidimensional_connection_string)\
        .option("dbtable", "empresa")\
        .option("user", user_md)\
        .option("password", psswd_md)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
        .load()
    return df

def extract_equipo_from_multidimensional(spark):
    df = spark.read.format("jdbc")\
        .option("url", db_multidimensional_connection_string)\
        .option("dbtable", "tipo_equipo")\
        .option("user", user_md)\
        .option("password", psswd_md)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
        .load()
    return df

def extract_tipo_from_multidimensional(spark):
    df = spark.read.format("jdbc")\
        .option("url", db_multidimensional_connection_string)\
        .option("dbtable", "tipo_vuelo_trafico")\
        .option("user", user_md)\
        .option("password", psswd_md)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
        .load()
    return df




In [90]:
from pyspark.sql import functions as F
from pyspark.sql.types import DateType


def transform_aeropuertos(df_aeropuerto):
    dim_aeropuerto = df_aeropuerto.filter(df_aeropuerto.nombre != 'CAMILO DAZA No.2')
    modificaciones = dim_aeropuerto.groupBy('sigla').count()

    dim_aeropuerto = dim_aeropuerto.join(modificaciones, how='inner', on=['sigla'])
    dim_aeropuerto = dim_aeropuerto.withColumn('fecha_inicio',
                                               F.when((F.col('Ano') == 2014), '2009-01-01')
                                               .when((F.col('Ano') == 2015) & (F.col('count') == 2), '2009-01-01')
                                               .when((F.col('Ano') == 2015) & (F.col('count') == 3), '2015-01-01')
                                               .when((F.col('Ano') == 2016), '2016-01-01')
                                               .otherwise(0)
                                               )
    dim_aeropuerto = dim_aeropuerto.withColumn('fecha_fin',
                                               F.when((F.col('Ano') == 2014), '2014-12-31')
                                               .when((F.col('Ano') == 2015), '2015-12-31')
                                               .when((F.col('Ano') == 2016), '2100-12-31')
                                               .otherwise(0)
                                               )

    dim_aeropuerto = dim_aeropuerto.withColumn('vigente',
                                               F.when((F.col('Ano') == 2014), 'N')
                                               .when((F.col('Ano') == 2015), 'N')
                                               .when((F.col('Ano') == 2016), 'S')
                                               .otherwise(0)
                                               )
    return dim_aeropuerto.withColumn("iata", F.col("sigla")).withColumnRenamed("Ano", "year").drop("count")


def transform_vuelos(vuelos_df, aeropuertos_df, fecha_df, empresa_df, equipo_df, tipo_df):
    df = vuelos_df.selectExpr('ano', 'mes', 'origen', 'destino', 'sillas as puestos_disponibles', 'carga_ofrecida as carga_ofrecida', 'pasajeros as pasajeros_transportado', 'carga_bordo as carga_transportado', 'vuelos', 'empresa', 'tipo_equipo', 'tipo_vuelo', 'trafico')

    df = df.withColumn('date', F.concat(F.col('ano'),F.lit('-'), F.col('mes'), F.lit('-01')))
    df = df.withColumn('date', df['date'].cast(DateType()))

    df = df.withColumn("pasajeros_desaprovechado", F.col("puestos_disponibles") - F.col("pasajeros_transportado"))
    df = df.withColumn("carga_desaprovechado", F.col("carga_ofrecida") - F.col("carga_transportado"))

    tmp_df = fecha_df.withColumnRenamed("id", "id_fecha")
    df = df.join(tmp_df, (df['ano'] == tmp_df['year']) & (df['mes']==tmp_df['month']) & (1==tmp_df['day']))

    tmp_df = aeropuertos_df.withColumnRenamed("id", "id_origen").withColumnRenamed("sigla", "sigla_origen").select("id_origen", "sigla_origen", "fecha_inicio", "fecha_fin")
    tmp_df = tmp_df.withColumn('fecha_inicio', tmp_df['fecha_inicio'].cast(DateType()))
    tmp_df = tmp_df.withColumn('fecha_fin', tmp_df['fecha_fin'].cast(DateType()))
    df = df.join(tmp_df, (df['origen'] == tmp_df['sigla_origen']) & (tmp_df['fecha_inicio'] <= df['date']) & (df['date'] <= tmp_df['fecha_fin']))


    tmp_df = aeropuertos_df.withColumnRenamed("id", "id_destino").withColumnRenamed("sigla", "sigla_destino").select("id_destino", "sigla_destino", "fecha_inicio", "fecha_fin")
    tmp_df = tmp_df.withColumn('fecha_inicio', tmp_df['fecha_inicio'].cast(DateType()))
    tmp_df = tmp_df.withColumn('fecha_fin', tmp_df['fecha_fin'].cast(DateType()))
    df = df.join(tmp_df, (df['destino'] == tmp_df['sigla_destino']) & (tmp_df['fecha_inicio'] <= df['date']) & (df['date'] <= tmp_df['fecha_fin']))

    tmp_df = empresa_df.withColumnRenamed("id", "id_empresa")
    df = df.join(tmp_df, df['empresa'] == tmp_df['empresa'])

    tmp_df = equipo_df.withColumnRenamed("id", "id_equipo")
    df = df.join(tmp_df, df['tipo_equipo'] == tmp_df['equipo'])

    tmp_df = tipo_df.withColumnRenamed("id", "id_tipo")
    df = df.join(tmp_df, (df['tipo_vuelo'] == tmp_df['vuelo']) & (df['trafico'] == tmp_df['trafico']))

    return df.select('id_fecha', 'id_origen', 'id_destino','id_equipo', 'id_empresa', 'id_tipo', 'puestos_disponibles','carga_ofrecida','pasajeros_transportado', 'carga_transportado', 'pasajeros_desaprovechado', 'carga_desaprovechado', 'vuelos')


### Ejecución del proceso ETL.

In [91]:
# ETL aeropuertos
aeropuertos_df = extract_aeropuertos(spark)
aeropuertos_df = transform_aeropuertos(aeropuertos_df)

aeropuertos_df.select('*').write.format('jdbc')\
    .mode('append')\
    .option("url", db_multidimensional_connection_string)\
    .option("dbtable", "aeropuertos")\
    .option("user", user_md)\
    .option("password", psswd_md)\
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .save()


In [92]:
empresa_df, equipo_df, tipo_vuelotrafico_df = extract_otros(spark)

empresa_df.select('*').write.format('jdbc')\
    .mode('append')\
    .option("url", db_multidimensional_connection_string)\
    .option("dbtable", "empresa")\
    .option("user", user_md)\
    .option("password", psswd_md)\
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .save()

equipo_df.select('*').write.format('jdbc')\
    .mode('append')\
    .option("url", db_multidimensional_connection_string)\
    .option("dbtable", "tipo_equipo")\
    .option("user", user_md)\
    .option("password", psswd_md)\
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .save()


tipo_vuelotrafico_df.select('*').write.format('jdbc')\
    .mode('append')\
    .option("url", db_multidimensional_connection_string)\
    .option("dbtable", "tipo_vuelo_trafico")\
    .option("user", user_md)\
    .option("password", psswd_md)\
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .save()

In [93]:
# ETL tabla hechos
vuelos_df = extract_vuelos(spark)
fecha_df = extract_fecha_from_multidimensional(spark)
aeropuertos_df = extract_aeropuertos_from_multidimensional(spark)
empresa_df = extract_empresa_from_multidimensional(spark)
equipo_df = extract_equipo_from_multidimensional(spark)
tipo_vuelotrafico_df = extract_tipo_from_multidimensional(spark)
vuelos_df = transform_vuelos(vuelos_df, aeropuertos_df, fecha_df, empresa_df, equipo_df, tipo_vuelotrafico_df)

vuelos_df.toPandas().shape

(82559, 13)

In [94]:
vuelos_df.select('*').write.format('jdbc')\
    .mode('append')\
    .option("url", db_multidimensional_connection_string)\
    .option("dbtable", "hecho_vuelos")\
    .option("user", user_md)\
    .option("password", psswd_md)\
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .save()