### Objetivos:
1. Trazer somente colunas relevantes;
2. Trazer coluna com soma das lesões;
3. Renomear colunas;
4. Excluir dados sem registros;
5. Coluna de Data de Atualização;
6. Salvar na Gold com dados particionados por UF > ESTADO;

In [0]:
%fs ls /mnt/Anac/Silver

In [0]:
df = spark.read\
    .format("parquet")\
    .option("header", True)\
    .option("inferSchema", True)\
    .option("compression", "gzip")\
    .load("dbfs:/mnt/Anac/Silver/anac_silver.parquet")

display(df)


In [0]:
display(df.select('Lesoes_Fatais_Passageiros'))

# 1. Trazer somente colunas relevantes

In [0]:
colunas_relevantes = [
    'Classificacao_da_Ocorrência',
    'Danos_a_Aeronave',
    # 'Data_da_Ocorrencia' + 'Hora_da_Ocorrência'
    'Descricao_do_Tipo',
    'Fase_da_Operacao',
    'Historico',
    'UF',
    # Total de Lesões,
    ##### Colunas abaixo necessárias para calcular o total de lesões
    "Lesoes_Desconhecidas_Passageiros",
    "Lesoes_Desconhecidas_Terceiros",
    "Lesoes_Desconhecidas_Tripulantes",
    "Lesoes_Fatais_Passageiros",
    "Lesoes_Fatais_Terceiros",
    "Lesoes_Fatais_Tripulantes",
    "Lesoes_Graves_Passageiros",
    "Lesoes_Graves_Terceiros",
    "Lesoes_Graves_Tripulantes",
    "Lesoes_Leves_Passageiros",
    "Lesoes_Leves_Terceiros",
    "Lesoes_Leves_Tripulantes",
]

gold_df = df.select(colunas_relevantes)

# 2. Total de Lesões

In [0]:
colunas_lesoes = [
    "Lesoes_Desconhecidas_Passageiros",
    "Lesoes_Desconhecidas_Terceiros",
    "Lesoes_Desconhecidas_Tripulantes",
    "Lesoes_Fatais_Passageiros",
    "Lesoes_Fatais_Terceiros",
    "Lesoes_Fatais_Tripulantes",
    "Lesoes_Graves_Passageiros",
    "Lesoes_Graves_Terceiros",
    "Lesoes_Graves_Tripulantes",
    "Lesoes_Leves_Passageiros",
    "Lesoes_Leves_Terceiros",
    "Lesoes_Leves_Tripulantes",
]
gold_df = gold_df\
    .withColumn("Total de Lesões", sum(gold_df[qtd_lesoes] for qtd_lesoes in colunas_lesoes))

display(gold_df.select("Total de Lesões"))

In [0]:
gold_df = gold_df.drop(*colunas_lesoes)
display(gold_df)

# 3. Renomear colunas

In [0]:
print(gold_df.columns)

In [0]:
gold_df = gold_df\
    .withColumnRenamed('Classificacao_da_Ocorrência', 'Classificação da Ocorrência')\
    .withColumnRenamed('Danos_a_Aeronave', 'Danos a Aeronave')\
    .withColumnRenamed('Descricao_do_Tipo', 'Descrição do Tipo')\
    .withColumnRenamed('Fase_da_Operacao', 'Fase da Operação')\
    .withColumnRenamed('Historico', 'Histórico')\
    .withColumnRenamed('UF', 'Estado')

display(gold_df)


# 4. Excluir Dados Sem Registros

In [0]:
# display(gold_df.select('Classificação da Ocorrência').distinct())
# Erase Results with:
# Indeterminado

gold_df = gold_df.where(gold_df['Classificação da Ocorrência'] != 'Indeterminado')

In [0]:
# display(gold_df.select('Danos a Aeronave').distinct())
# Erase Results with:
# Desconhecida
# Sem Registro

gold_df = gold_df.where(gold_df['Danos a Aeronave'] != 'Desconhecida').where(gold_df['Danos a Aeronave'] != 'Sem Registro')

In [0]:
# display(gold_df.select('Descrição do Tipo').distinct())
# Erase Results with:
# Sem Registro

gold_df = gold_df.where(gold_df['Descrição do Tipo'] != 'Sem Registro')

In [0]:
display(gold_df.select('Fase da Operação').distinct())
# Erase Results with:
# Indeterminada
# Sem Registro

gold_df = gold_df\
    .where(gold_df['Fase da Operação'] != 'Indeterminada')\
    .where(gold_df['Fase da Operação'] != 'Sem Registro')

In [0]:
gold_df = gold_df.where(gold_df['Histórico'] != 'Sem Registro')

display(gold_df.where(gold_df['Histórico'] == 'Sem Registro').count())

In [0]:
# display(gold_df.where(gold_df.Estado == 'Sem Registro').count()) # 12
# display(gold_df.where(gold_df.Estado == 'Indeterminado').count()) # 677
# display(gold_df.where(gold_df.Estado == 'Exterior').count()) # 56

gold_df = gold_df\
    .where(gold_df.Estado != 'Sem Registro')\
    .where(gold_df.Estado != 'Indeterminado')\
    .where(gold_df.Estado != 'Exterior')

In [0]:
display(gold_df)

# 5. Coluna de Data de Atualização

In [0]:
from pyspark.sql.functions import current_timestamp, date_format, from_utc_timestamp

gold_df = gold_df.withColumn("Atualizado em", 
    date_format(
        from_utc_timestamp(
            current_timestamp(),
            "America/Sao_Paulo"
        ), 
        "yyyy-MM-dd HH:mm:ss" 
    )
) 

In [0]:
display(gold_df.select('Atualizado em'))

# 6. Salvar na Gold com dados particionados por UF > ESTADO

In [0]:
gold_df.write\
    .mode("overwrite")\
    .format("parquet")\
    .option("header", True)\
    .option("inferSchema", True)\
    .partitionBy("Estado")\
    .save("dbfs:/mnt/Anac/Gold/anac_gold_particionado")

In [0]:
print("A")