In [0]:
"""
Propostos:

1°) Selecionar somente algumas colunas
2°) Criar uma nova coluna que faz a soma de todas as lesões
3°) Renomear colunas para ficar melhor intuitivas
4°) Excluir dados que tenham a classificação [Indeterminado, Sem Registro, Exterior]
5°) Criar coluna com nome atualização, para saber quando os dados foram atualizados
6°) Salvar a camada Gold particionado por estado
7°) Criar consultas com os dados particionados do estado de SP

"""

In [0]:
df = spark.read.parquet("dbfs:/mnt/Anac/Silver/anac_silver.parquet")
display(df)

In [0]:
print(df.columns)

In [0]:
#Selecionando algumas colunas
colunas = ['Aerodromo_de_Destino', 'Aerodromo_de_Origem', 'Classificacao_da_Ocorrência', 'Danos_a_Aeronave', 'Data_da_Ocorrencia', 'Regiao', 'UF', 'Tipo_de_Aerodromo', 'Tipo_de_Ocorrencia', 'Lesoes_Desconhecidas_Passageiros', 'Lesoes_Desconhecidas_Terceiros', 'Lesoes_Desconhecidas_Tripulantes', 'Lesoes_Fatais_Passageiros', 'Lesoes_Fatais_Terceiros', 'Lesoes_Fatais_Tripulantes', 'Lesoes_Graves_Passageiros', 'Lesoes_Graves_Terceiros', 'Lesoes_Graves_Tripulantes', 'Lesoes_Leves_Passageiros', 'Lesoes_Leves_Terceiros', 'Lesoes_Leves_Tripulantes']

df = df.select(colunas)
display(df)

In [0]:
# Criando coluna que soma as lesões
somando_colunas = [
    'Lesoes_Desconhecidas_Passageiros', 
    'Lesoes_Desconhecidas_Terceiros', 
    'Lesoes_Desconhecidas_Tripulantes', 
    'Lesoes_Fatais_Passageiros', 
    'Lesoes_Fatais_Terceiros', 
    'Lesoes_Fatais_Tripulantes', 
    'Lesoes_Graves_Passageiros', 
    'Lesoes_Graves_Terceiros', 
    'Lesoes_Graves_Tripulantes', 
    'Lesoes_Leves_Passageiros', 
    'Lesoes_Leves_Terceiros', 
    'Lesoes_Leves_Tripulantes'
    ]
df = df.withColumn("Total_Lesoes", sum(df[somartudo] for somartudo in somando_colunas))
display(df)

In [0]:
# Renomeando para ficar mais intuitivo

df = df\
    .withColumnRenamed("Aerodromo_de_Destino", "Destino")\
    .withColumnRenamed("Aerodromo_de_Origem", "Origem")\
    .withColumnRenamed("Classificacao_da_Ocorrência", "Classificacao")\
    .withColumnRenamed("Danos_a_Aeronave", "Danos")\
    .withColumnRenamed("Data_da_Ocorrencia", "Data")\
    .withColumnRenamed("UF", "Estado")
display(df)

In [0]:
#4°) Excluir dados que estados tenham a classificação [Indeterminado, Sem Registro, Exterior]

filtro = ["Indeterminado", "Sem Registro", "Exterior"]
df = df.filter(~df['Estado'].isin(filtro))
display(df)

In [0]:
#Criar coluna com nome atualização, para saber quando os dados foram atualizados

from pyspark.sql.functions import current_timestamp, date_format, from_utc_timestamp
df = df.withColumn("Atualizacao",\
    date_format(from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"), "yyyy-MM-dd HH:mm:ss"))
display(df)

In [0]:
#Salvar a camada Gold particionado por estado
df.write\
    .mode("overwrite")\
    .format("parquet")\
    .partitionBy("Estado")\
    .save("dbfs:/mnt/Anac/Gold/anac_gold_por_estado")

In [0]:
#Visualizando arquivos particionados de cada estado
%fs ls dbfs:/mnt/Anac/Gold/anac_gold_por_estado

In [0]:
#Visualizando tabelas do estado de SP
display(spark.read.parquet("dbfs:/mnt/Anac/Gold/anac_gold_por_estado/Estado=SP/"))

In [0]:
# Criando consultas SQL para ajudar o time de análise de dados a criar dashboards


In [0]:
%sql
create or replace temp view anac_sp
using parquet
options (path "/mnt/Anac/Gold/anac_gold_por_estado/Estado=SP/");

In [0]:
%sql

--Ocorrências na ultima década
select * 
from anac_sp 
where Data between '2010-01-01' and '2020-12-31'

In [0]:
%sql

-- Ocorrências leves do tipo SCF-NP
select 
Danos,
Tipo_de_Ocorrencia
from anac_sp
where Danos = 'Leve' and Tipo_de_Ocorrencia = 'SCF-NP'

In [0]:
%sql

--Datas em que ocorreram morte de passageiros
select Data,
count(Lesoes_Fatais_Passageiros) as mortes
from anac_sp
group by Data
order by mortes desc