## Import's

In [1]:
import os
import re
import boto3
import json
import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import (
    ArrayType,
    FloatType,
    TimestampType,
    DateType,
    StringType,
    IntegerType,
    LongType,
    StructType,
    StructField
)
import decamelize
from unidecode import unidecode
import http.client

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"

In [3]:
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())

<pyspark.sql.session.SparkSession.Builder at 0x7f83b322a070>

## Criando sessão spark

In [4]:
spark = (
    SparkSession.builder.appName('teste').master('local[*]').getOrCreate()
)

22/04/09 21:19:48 WARN Utils: Your hostname, DESKTOP-68F50BH resolves to a loopback address: 127.0.1.1; using 172.20.217.166 instead (on interface eth0)
22/04/09 21:19:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/04/09 21:19:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Configurações de visualização Spark

In [5]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows",200)

## Lendo os dados VRA

In [6]:
df_vra = spark.read.option("multiline","true") \
      .json("VRA/*.json")

                                                                                

In [7]:
df_vra = df_vra.drop_duplicates()

### Tratamento do cabeçalho

In [8]:
def vra_cols(rename_df):
    for column in rename_df.columns:
        col = decamelize.convert(column)
        new_col = unidecode(col)
        rename_df = rename_df.withColumnRenamed(column, new_col)
    return rename_df 

In [9]:
df_vra_tratado = vra_cols(df_vra)

### Salvando dados VRA

In [11]:
df_vra_tratado.coalesce(1).write.mode('overwrite').parquet('VRA_TRAT/')

                                                                                

## Lendo os dados AIR CIA

In [12]:
df_air_cia = spark.read.options(
    delimiter=';',
    header=True,
    encoding="utf-8"
).csv('AIR_CIA/*.csv')

In [13]:
df_air_cia= df_air_cia.drop_duplicates()

In [14]:
F.udf(StringType())
def air_cols(rename_df):
    for column in rename_df.columns:
        col = decamelize.convert(column)
        n_col = col.replace('-', "").replace(" ", "")
        new_col = unidecode(n_col)
        rename_df = rename_df.withColumnRenamed(column, new_col)
    return rename_df

In [15]:
df_air_cia_tratado = air_cols(df_air_cia)

## Separação de colunas ICAO e IATA

In [16]:
df_air_cia_tratado = ( df_air_cia_tratado
                      .withColumn('icao', F.split(df_air_cia_tratado['icaoiata'], ' ').getItem(0))
                      .withColumn('iata', F.split(df_air_cia_tratado['icaoiata'], ' ').getItem(1))
                     )

### Savando os dados

In [17]:
df_air_cia_tratado.coalesce(1).write.mode('overwrite').parquet('AIR_CIA_TRAD/')

[Stage 5:>                                                          (0 + 1) / 1]                                                                                

## Get dados API

### listando os aeroportos de origem e destino 

In [18]:
df_list_ori = df_vra_tratado.select(F.col("icao_aerodromo_origem").alias("icao")).distinct()

In [19]:
df_list_dest = df_vra_tratado.select(F.col("icao_aerodromo_destino").alias("icao")).distinct()

### Fazendo union e pegando o distinct

In [20]:
df_list = df_list_ori.union(df_list_dest)

In [21]:
list_distinct = df_list.distinct()

### Requisição na API e recuperação dos dados  usando Python

In [22]:
list_dist = list_distinct.toPandas()

                                                                                

In [23]:
list_aero = []

for index, row in list_dist.iterrows():
    conn = http.client.HTTPSConnection("airport-info.p.rapidapi.com")
    headers = {
        'x-rapidapi-key': "62d74b28f7msh522d3fa17190630p166a28jsn06312678dd42",
        'x-rapidapi-host': "airport-info.p.rapidapi.com"
        }
    conn.request("GET", "/airport?icao="+row['icao']+"", headers=headers)
    res = conn.getresponse()
    data = res.read()
    aero = data.decode("utf-8")
    a_json = json.loads(aero)
    list_aero.append(a_json)

In [24]:
df_aero = pd.DataFrame(list_aero)

### Transformando dataframe para Spark

In [25]:
schema_aero = StructType([ StructField("id", FloatType(), True)\
                       ,StructField("iata", StringType(), True)\
                       ,StructField("icao", StringType(), True)\
                       ,StructField("name", StringType(), True)\
                       ,StructField("location", StringType(), True)\
                       ,StructField("street_number", StringType(), True)\
                       ,StructField("street", StringType(), True)\
                       ,StructField("city", StringType(), True)\
                       ,StructField("county", StringType(), True)\
                       ,StructField("state", StringType(), True)\
                       ,StructField("country_iso", StringType(), True)\
                       ,StructField("country", StringType(), True)\
                       ,StructField("postal_code", StringType(), True)\
                       ,StructField("phone", StringType(), True)\
                       ,StructField("latitude", FloatType(), True)\
                       ,StructField("longitude", FloatType(), True)\
                       ,StructField("uct", FloatType(), True)\
                       ,StructField("website", StringType(), True)\
                       ,StructField("error", StringType(), True)])

In [26]:
df_aer = spark.createDataFrame(df_aero, schema=schema_aero) 

In [27]:
df_aer = df_aer.withColumn("id", F.col("id").cast(IntegerType()))

In [28]:
df_aer = df_aer.drop_duplicates()

### Salvando os dados

In [29]:
df_aer.coalesce(1).write.mode('overwrite').parquet('AERO/')

                                                                                

## Criar views SQL

In [30]:
df_vra_tratado.createOrReplaceTempView("df_vra_tratado")

In [31]:
df_air_cia_tratado.createOrReplaceTempView("df_air_cia_tratado")

In [32]:
df_aer.createOrReplaceTempView("df_aer")

### View rota mais utilizada por cada companhia

#### Classificação de rotas e seleção da mais utilizada

In [34]:
view_voos = spark.sql("""
    select * FROM (
        SELECT 
            v.icao_empresa_aerea, 
            v.icao_aerodromo_origem, 
            v.icao_aerodromo_destino, 
            RANK() OVER (PARTITION BY v.icao_empresa_aerea ORDER BY COUNT(*) DESC ) AS rn,
            COUNT(*) as qtd_voo
        FROM df_vra_tratado AS v
        GROUP BY v.icao_empresa_aerea, v.icao_aerodromo_origem, v.icao_aerodromo_destino
) WHERE rn = 1;
"""
)

#### criação da view para resposta final

In [35]:
view_voos.createOrReplaceTempView("view_voos")

#### Joins para obter todas as informações necessárias

In [36]:
view_companhia_rota = spark.sql("""
    SELECT
        c.razao_social, 
        o.name nome_aeroporto_origem, 
        icao_aerodromo_origem, 
        o.state estado_aeroporto_origem, 
        icao_aerodromo_destino, 
        d.name nome_aeroporto_destino, 
        d.state estado_aeroporto_destino
    FROM view_voos as v
    LEFT JOIN df_air_cia_tratado as c
        ON v.icao_empresa_aerea = c.icao
    LEFT JOIN df_aer as o
        ON v.icao_aerodromo_origem = o.icao
    LEFT JOIN df_aer as d
        ON v.icao_aerodromo_destino = d.icao
    WHERE c.razao_social IS NOT NULL;
          """)


### view de compnhias aereas com maior atuação em cada aeroporto

#### criação de views com contagem de atuação de cada empresa nos aeroportos como origem e destino

In [52]:
view_aeroporto_companhia_origem = spark.sql("""
            SELECT 
                icao_aerodromo_origem, icao_empresa_aerea, 
                count(*) as qtd_origem from df_vra_tratado
             GROUP BY icao_aerodromo_origem, icao_empresa_aerea
          """)

In [47]:
view_aeroporto_companhia_destino = spark.sql("""
            SELECT 
                icao_aerodromo_destino, icao_empresa_aerea, 
                count(*) as qtd_destino from df_vra_tratado
             GROUP BY icao_aerodromo_destino, icao_empresa_aerea
          """)

In [54]:
view_aeroporto_companhia_origem.createOrReplaceTempView("view_aeroporto_companhia_origem")
view_aeroporto_companhia_destino.createOrReplaceTempView("view_aeroporto_companhia_destino")

#### join das views para calculo de atuação total e classficação de atuação por companhia em cada aeroporto

In [111]:
view_aeroporto = spark.sql("""
    SELECT 
        *,
        RANK() OVER (PARTITION BY icao_aerodromo_origem ORDER BY qtd_total DESC ) AS rn
    FROM(
        SELECT 
            *, 
            qtd_origem + qtd_destino as qtd_total 
        FROM 
            (
            SELECT 
                o.icao_aerodromo_origem , 
                o.icao_empresa_aerea, 
                qtd_origem, 
                qtd_destino 
            FROM view_aeroporto_companhia_origem as o
            LEFT JOIN view_aeroporto_companhia_destino as d
            ON o.icao_aerodromo_origem = d.icao_aerodromo_destino AND o.icao_empresa_aerea = d.icao_empresa_aerea)
            ) 
    
"""
         )

In [112]:
view_aeroporto.createOrReplaceTempView("view_aeroporto")

#### Join para obter as informações necessárias 

In [126]:
view_aeroporto_companhia = spark.sql("""
    SELECT 
        b.name as nome_aeroporto, 
        a.icao_aerodromo_origem as icao_aeroporto, 
        c.razao_social as razao_social_companhia_aerea, 
        a.qtd_origem, 
        a.qtd_destino, 
        a.qtd_total
    FROM view_aeroporto as a
    LEFT JOIN df_aer as b
        ON a.icao_aerodromo_origem = b.icao
    LEFT JOIN df_air_cia_tratado as c
        ON a.icao_empresa_aerea = c.icao
    WHERE rn = 1 AND c.razao_social IS NOT NULL;
    
"""
         )