## Modelagem
A partir da tabela voos iremos montar nosso esquema estrela. O esquema será composto de 5 dimensões e um fato:

#### Dimensão aeronave:
- **id_equipamento (chave primária)**
- sg_equipamento_icao
- ds_modelo
- ds_matricula

#### Dimensão aeroporto:
- **id_aerodromo (chave primária)**
- id_aerodromo
- sg_icao
- sg_iata
- nm_aerodromo
- nm_municipio
- sg_uf
- nm_regiao
- nm_pais
- nm_continente

#### Dimensão empresa:
- **id_empresa (chave primária)**
- sg_empresa_icao
- sg_empresa_iata
- nm_empresa
- nm_pais
- ds_tipo_empresa

#### Dimensão linha:
- **id_tipo_linha (chave primária)**
- cd_tipo_linha
- ds_tipo_linha
- ds_natureza_tipo_linha
- ds_servico_tipo_linha

#### Dimensão tempo:
- **dt_voo (chave primária)**
- nr_dia
- nr_mes
- nr_ano
- nr_semestre
- nr_trimestre intege

## Aplicando filtros e critérios de qualidade

Para entender melhor o motivo dessas operações, consulte esta planilha [aqui](https://docs.google.com/spreadsheets/d/1_4_-XutVG2gyGrHMkfTlin1-UbQRA27tdRVC9ZI0fmk/edit?usp=sharing). 
1. Selecionar colunas necessárias (consultar motivo para colunas serem descartadas na planilha a cima);
2. Aplicar filtros e correção de valores inválidos;
3. Aplicar tipagem correta nas colunas;
4. Montar entidades dimensões;
5. Montar entidade fato;


In [0]:
%sql
USE CATALOG mvp;
USE SCHEMA silver;

In [0]:
import pyspark.sql.functions as F

In [0]:
df = spark.table("mvp.bronze.voos")

Selecionar colunas necessárias:

In [0]:
df_colunas = df.select([
  "id_equipamento",
  "sg_equipamento_icao",
  "ds_modelo",
  "ds_matricula",
  "id_aerodromo_origem",
  "sg_icao_origem",
  "sg_iata_origem",
  "nm_aerodromo_origem",
  "nm_municipio_origem",
  "sg_uf_origem",
  "nm_regiao_origem",
  "nm_pais_origem",
  "nm_continente_origem",
  "id_aerodromo_destino",
  "sg_icao_destino",
  "sg_iata_destino",
  "nm_aerodromo_destino",
  "nm_municipio_destino",
  "sg_uf_destino",
  "nm_regiao_destino",
  "nm_pais_destino",
  "nm_continente_destino",
  "dt_partida_real",
  "id_empresa",
  "sg_empresa_icao",
  "sg_empresa_iata",
  "nm_empresa",
  "nm_pais",
  "id_tipo_linha",
  "cd_tipo_linha",
  "ds_tipo_linha",
  "ds_natureza_tipo_linha",
  "ds_servico_tipo_linha",
  "id_basica",
  "nr_voo",
  "nr_singular",
  "ds_natureza_etapa",
  "hr_partida_real",
  "hr_chegada_real",
  "dt_chegada_real",
  "lt_combustivel",
  "nr_assentos_ofertados",
  "kg_payload",
  "km_distancia",
  "nr_passag_pagos",
  "nr_passag_gratis",
  "kg_bagagem_livre",
  "kg_bagagem_excesso",
  "kg_carga_paga",
  "kg_carga_gratis",
  "kg_correio",
  "nr_horas_voadas",
  "kg_peso",
  "nr_velocidade_media",
  "nr_pax_gratis_km",
  "nr_carga_paga_km",
  "nr_carga_gratis_km",
  "nr_correio_km",
  "nr_bagagem_paga_km",
  "nr_bagagem_gratis_km",
  "nr_ask",
  "nr_rpk",
  "nr_atk"
])

Aplicar filtros e correção de valores inválidos:

In [0]:
print("Excluindo linhas nulas:")
print("="*20)
colunas = [
  "id_equipamento",
  "id_aerodromo_origem", 
  "id_aerodromo_destino",
  "id_empresa",
  "id_tipo_linha", 
  "id_basica", 
]
df_nao_nulos = df_colunas.selectExpr("*")
for coluna in colunas:
  print("Total de linhas antes da exclusão: ", df_nao_nulos.count())
  
  total_registros_nulos = df_nao_nulos.where(F.col(coluna).isNull()).count()
  print("Total de linhas nulas na coluna ", coluna, ": ", total_registros_nulos)
  
  df_nao_nulos = df_nao_nulos.filter(F.col(coluna).isNotNull())

  print("Total de linhas depois da exclusão: ", df_nao_nulos.count())
  print("-"*20)

In [0]:
print("Convertendo strings vazias em null:")
print("="*20)
colunas = [
  "ds_modelo",
  "ds_matricula",
  "nm_aerodromo_origem",
  "nm_municipio_origem",
  "nm_regiao_origem",
  "nm_pais_origem",
  "nm_continente_origem",
  "nm_aerodromo_destino",
  "nm_municipio_destino",
  "nm_regiao_destino",
  "nm_pais_destino",
  "nm_continente_destino",
  "nm_empresa",
  "nm_pais",
  "ds_tipo_linha",
  "ds_natureza_tipo_linha",
  "nr_singular"
]
df_sem_vazios = df_nao_nulos.selectExpr("*")
for coluna in colunas:
  print(f"Total de linhas da coluna {coluna} com strings vazias ANTES do processamento: ", df_sem_vazios.where(F.trim(F.col(coluna)) == "").count())
  df_sem_vazios = df_sem_vazios.withColumn(coluna, F.when((F.trim(F.col(coluna)) == ""), F.lit(None)).otherwise(F.trim(F.col(coluna))))
  print(f"Total de linhas da coluna {coluna} com strings vazias DEPOIS do processamento: ", df_sem_vazios.where(F.trim(F.col(coluna)) == "").count())
  print("-"*20)


In [0]:
print("Removendo valores inteiros menores que 0:")
print("="*20)
from pyspark.sql.types import IntegerType
colunas = [
    "nr_voo",
    "lt_combustivel",
    "nr_assentos_ofertados",
    "kg_payload",
    "km_distancia",
    "nr_passag_pagos",
    "nr_passag_gratis",
    "kg_bagagem_livre",
    "kg_bagagem_excesso",
    "kg_carga_paga",
    "kg_carga_gratis",
    "kg_correio",
    "kg_peso",
    "nr_pax_gratis_km",
    "nr_carga_paga_km",
    "nr_carga_gratis_km",
    "nr_correio_km",
    "nr_bagagem_paga_km",
    "nr_bagagem_gratis_km",
    "nr_ask",
    "nr_rpk",
    "nr_atk"
]
df_sem_inteiros_menores_zero = df_sem_vazios.selectExpr("*")
for coluna in colunas:
    df_sem_inteiros_menores_zero = df_sem_inteiros_menores_zero.withColumn(coluna, F.col(coluna).try_cast(IntegerType()))
    print(f"Total de valores menores que zero na coluna {coluna} ANTES do processamento: ", df_sem_inteiros_menores_zero.where(F.col(coluna) < 0 ).count())
    df_sem_inteiros_menores_zero = df_sem_inteiros_menores_zero.withColumn(coluna, F.when((F.col(coluna) < 0), F.lit(None)).otherwise(F.col(coluna)))
    print(f"Total de valores menores que zero na coluna {coluna} ANTES do processamento: ", df_sem_inteiros_menores_zero.where(F.col(coluna) < 0 ).count())
    print("-"*20)

In [0]:
print("Removendo valores float menores que 0:")
print("="*20)
from pyspark.sql.types import FloatType
colunas = [
    "nr_horas_voadas",
    "nr_velocidade_media"
]
df_sem_float_menores_zero = df_sem_vazios.selectExpr("*")
for coluna in colunas:
    df_sem_float_menores_zero = df_sem_float_menores_zero.withColumn(coluna, F.col(coluna).try_cast(FloatType()))
    print(f"Total de valores menores que zero na coluna {coluna} ANTES do processamento: ", df_sem_float_menores_zero.where(F.col(coluna) < 0 ).count())
    df_sem_float_menores_zero = df_sem_float_menores_zero.withColumn(coluna, F.when((F.col(coluna) < 0), F.lit(None)).otherwise(F.col(coluna)))
    print(f"Total de valores menores que zero na coluna {coluna} ANTES do processamento: ", df_sem_float_menores_zero.where(F.col(coluna) < 0 ).count())
    print("-"*20)

In [0]:
df_valores_validos = df_sem_float_menores_zero.selectExpr("*")
#Valores válidos:
coluna = "cd_tipo_linha"
print("Processamento da coluna:", coluna)
valores_validos = ["N", "C", "I", "G", "X"]
print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())

df_valores_validos = df_valores_validos.withColumn(coluna, F.when(~(F.trim(F.col(coluna)).isin(valores_validos)), F.lit("X")).otherwise(F.trim(F.col(coluna))))

print("Total de valores inválidos DEPOIS do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())


In [0]:
coluna = "ds_tipo_linha"
print("Processamento da coluna:", coluna)
valores_validos = [
    "DOMÉSTICA MISTA",
    "DOMÉSTICA CARGUEIRA",
    "INTERNACIONAL CARGUEIRA",
    "INTERNACIONAL MISTA",
    "NÃO IDENTIFICADA"
]
print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())

df_valores_validos = df_valores_validos.withColumn(coluna, F.when(~(F.trim(F.col(coluna)).isin(valores_validos)), F.lit("NÃO IDENTIFICADA")).otherwise(F.trim(F.col(coluna))))

print("Total de valores inválidos DEPOIS do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())


In [0]:
coluna = "ds_natureza_etapa"
print("Processamento da coluna:", coluna)
valores_validos = [
    "DOMÉSTICA",
    "INTERNACIONAL",
    "NÃO IDENTIFICADA"
]
print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())

df_valores_validos = df_valores_validos.withColumn(coluna, F.when(~(F.trim(F.col(coluna)).isin(valores_validos)), F.lit("NÃO IDENTIFICADA")).otherwise(F.trim(F.col(coluna))))

print("Total de valores inválidos DEPOIS do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())


In [0]:
coluna = "ds_servico_tipo_linha"
print("Processamento da coluna:", coluna)
valores_validos = [
    "NÃO IDENTIFICADO",
    "PASSAGEIRO",
    "CARGUEIRO"
]
print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())

df_valores_validos = df_valores_validos.withColumn(coluna, F.when(~(F.trim(F.col(coluna)).isin(valores_validos)), F.lit("NÃO IDENTIFICADO")).otherwise(F.trim(F.col(coluna))))

print("Total de valores inválidos DEPOIS do processamento:", df_valores_validos.where(~(F.trim(F.col(coluna)).isin(valores_validos))).count())


In [0]:
coluna = "dt_partida_real"
print("Processamento da coluna:", coluna)

df_valores_validos = df_valores_validos.withColumn(coluna, F.try_to_date(F.col(coluna), "yyyy-MM-dd"))

print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(F.col(coluna).isNull()).count())

df_valores_validos = df_valores_validos.filter(F.col(coluna).isNotNull())

print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(F.col(coluna).isNull()).count())



In [0]:
coluna = "dt_chegada_real"
print("Processamento da coluna:", coluna)

df_valores_validos = df_valores_validos.withColumn(coluna, F.try_to_date(F.col(coluna), "yyyy-MM-dd"))

print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(F.col(coluna).isNull()).count())

df_valores_validos = df_valores_validos.filter(F.col(coluna).isNotNull())

print("Total de valores inválidos ANTES do processamento:", df_valores_validos.where(F.col(coluna).isNull()).count())



In [0]:
from pyspark.sql.types import StringType
df_valores_validos_len = df_valores_validos.selectExpr("*")
colunas_len = {
    "sg_uf_origem": 2,
    "sg_uf_destino": 2,
    "sg_empresa_iata": 2,
    "sg_iata_origem": 3,
    "sg_iata_destino": 3,
    "sg_empresa_icao": 3,
    "sg_icao_origem": 4,
    "sg_icao_destino": 4
}
for coluna, tamanho in colunas_len.items():
    print("Processamento da coluna:", coluna)
    print("Tamanho válido:", tamanho)

    print("Total de valores inválidos ANTES do processamento:", df_valores_validos_len.where(~(F.length(F.trim(F.col(coluna))) == tamanho)).count())

    df_valores_validos_len = df_valores_validos_len.withColumn(coluna, F.when(~(F.length(F.trim(F.col(coluna))) == tamanho), F.lit(None)).otherwise(F.trim(F.col(coluna))))

    print("Total de valores inválidos DEPOIS do processamento:", df_valores_validos_len.where(~(F.length(F.trim(F.col(coluna))) == tamanho)).count())

In [0]:
coluna = "sg_equipamento_icao"
print("Processamento da coluna:", coluna)
min = 2
max = 4


print("Total de valores inválidos ANTES do processamento:", df_valores_validos_len.where(~((F.length(F.trim(F.col(coluna))) >= min) & (F.length(F.trim(F.col(coluna))) <= max))).count())

df_valores_validos_len = df_valores_validos_len.withColumn(coluna, F.when(~((F.length(F.trim(F.col(coluna))) >= min) & (F.length(F.trim(F.col(coluna))) <= max)), F.lit(None)).otherwise(F.trim(F.col(coluna))))

print("Total de valores inválidos DEPOIS do processamento:", df_valores_validos_len.where(~((F.length(F.trim(F.col(coluna))) >= min) & (F.length(F.trim(F.col(coluna))) <= max))).count())