In [0]:
df = spark.read.parquet("dbfs:/mnt/Anac/Silver/anac_silver.parquet/")

from pyspark.sql.functions import current_timestamp, date_format, from_utc_timestamp

# 1° item - colunas solicitadas
colunas = ['Aerodromo_de_Destino', 'Aerodromo_de_Origem', 'Classificacao_da_Ocorrência', 'Danos_a_Aeronave', 'Data_da_Ocorrencia', 'Municipio', 'UF', 'Regiao', 'Tipo_de_Aerodromo', 'Tipo_de_Ocorrencia', 'Lesoes_Desconhecidas_Passageiros', 'Lesoes_Desconhecidas_Terceiros', 'Lesoes_Desconhecidas_Tripulantes', 'Lesoes_Fatais_Passageiros', 'Lesoes_Fatais_Terceiros', 'Lesoes_Fatais_Tripulantes', 'Lesoes_Graves_Passageiros', 'Lesoes_Graves_Terceiros', 'Lesoes_Graves_Tripulantes', 'Lesoes_Leves_Passageiros', 'Lesoes_Leves_Terceiros', 'Lesoes_Leves_Tripulantes']

df = df.select(colunas)

In [0]:
# 2º item - coluna somando as lesoes
colunas_somar = [
    'Lesoes_Desconhecidas_Passageiros',
    'Lesoes_Desconhecidas_Terceiros',
    'Lesoes_Desconhecidas_Tripulantes',
    'Lesoes_Fatais_Passageiros',
    'Lesoes_Fatais_Terceiros',
    'Lesoes_Fatais_Tripulantes',
    'Lesoes_Graves_Passageiros',
    'Lesoes_Graves_Terceiros',
    'Lesoes_Graves_Tripulantes',
    'Lesoes_Leves_Passageiros',
    'Lesoes_Leves_Terceiros',
    'Lesoes_Leves_Tripulantes'
    ]

df = df.withColumn('Total_Lesoes', sum(df[i] for i in colunas_somar))

In [0]:
# 3º item - renomear colunas
df = df\
    .withColumnRenamed('Aerodromo_de_Destino', 'Destino')\
    .withColumnRenamed('Aerodromo_de_Origem', 'Origem')\
    .withColumnRenamed('Classificacao_da_Ocorrência', 'Classificacao')\
    .withColumnRenamed('Danos_a_Aeronave', 'Danos')\
    .withColumnRenamed('Data_da_Ocorrencia', 'Data')\
    .withColumnRenamed('UF', 'Estado')

# 4º item - excluir dados de estados que tenham a classificação [Indeterminado, Sem Registro, Exterior]
classific = ['Indeterminado', 'Sem Registro', 'Exterior']
df = df.filter(~df.Estado.isin(classific))

# 5º item - inserir colnuna com nome de atualização para o usuario ver quando os dados foram atualizados
df = df.withColumn("Atualizacao",\
     date_format(from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"), "yyyy/MM/dd HH:mm:ss"))
display(df)

In [0]:
# 6° item - salvar a camada Gold particionada por UF = 'Estado'

df.write.mode("overwrite")\
    .format("parquet")\
    .partitionBy("Estado")\
    .save("dbfs:/mnt/Anac/Gold/anac_gold_partic/")