In [0]:
%run "../utils/common_functions"

#### 1.Create Schema

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, BooleanType

In [0]:
occurrences_schema = StructType(fields=[StructField("Numero_da_Ocorrencia", IntegerType(), False),
                                       StructField("Numero_da_Ficha", StringType(), False),
                                       StructField("Operador_Padronizado", StringType(), True),
                                       StructField("Classificacao_da_Ocorrencia", StringType(), True),
                                       StructField("Data_da_Ocorrencia", DateType(), True),
                                       StructField("Hora_da_Ocorrencia", StringType(), True),
                                       StructField("Municipio", StringType(), True),
                                       StructField("UF", StringType(), True),
                                       StructField("Regiao", StringType(), True),
                                       StructField("Descricao_do_Tipo", StringType(), True),
                                       StructField("ICAO", StringType(), True),
                                       StructField("Latitude", StringType(), True),
                                       StructField("Longitude", StringType(), True),
                                       StructField("Tipo_de_Aerodromo", StringType(), True),
                                       StructField("Historico", StringType(), True),
                                       StructField("Matricula", StringType(), True),
                                       StructField("Categoria_da_Aeronave", StringType(), True),
                                       StructField("Operador", StringType(), True),
                                       StructField("Tipo_de_Ocorrencia", StringType(), True),
                                       StructField("Fase_da_Operacao", StringType(), True),
                                       StructField("Operacao", StringType(), True),
                                       StructField("Danos_a_Aeronave", StringType(), True),
                                       StructField("Aerodromo_de_Destino", StringType(), True),
                                       StructField("Aerodromo_de_Origem", StringType(), True),
                                       StructField("Lesoes_Fatais_Tripulantes", IntegerType(), True),
                                       StructField("Lesoes_Fatais_Passageiros", IntegerType(), True),
                                       StructField("Lesoes_Fatais_Terceiros", IntegerType(), True),
                                       StructField("Lesoes_Graves_Tripulantes", IntegerType(), True),
                                       StructField("Lesoes_Graves_Passageiros", IntegerType(), True),
                                       StructField("Lesoes_Graves_Terceiros", IntegerType(), True),
                                       StructField("Lesoes_Leves_Tripulantes", IntegerType(), True),
                                       StructField("Lesoes_Leves_Passageiros", IntegerType(), True),
                                       StructField("Lesoes_Leves_Terceiros", IntegerType(), True),
                                       StructField("Ilesos_Tripulantes", IntegerType(), True),
                                       StructField("Ilesos_Passageiros", IntegerType(), True),
                                       StructField("Lesoes_Desconhecidas_Tripulantes", IntegerType(), True),
                                       StructField("Lesoes_Desconhecidas_Passageiros", IntegerType(), True),
                                       StructField("Lesoes_Desconhecidas_Terceiros", IntegerType(), True),
                                       StructField("Modelo", StringType(), True),
                                       StructField("CLS", StringType(), True),
                                       StructField("Tipo_ICAO", StringType(), True),
                                       StructField("PMD", IntegerType(), True),
                                       StructField("Numero_de_Assentos", IntegerType(), True),
                                       StructField("Nome_do_Fabricante", StringType(), True),
                                       StructField("PSSO", StringType(), True)
])

In [0]:
aircraft_type_schema = StructType(fields=[StructField("ModelFullName", StringType(), False),
                                         StructField("Description", StringType(), False),
                                         StructField("WTC", StringType(), False),
                                         StructField("WTG", StringType(), False),
                                         StructField("Designator", StringType(), False),
                                         StructField("ManufacturerCode", StringType(), False),
                                         StructField("ShowInPart3Only", BooleanType(), False),
                                         StructField("AircraftDescription", StringType(), False),
                                         StructField("EngineCount", IntegerType(), False),
                                         StructField("EngineType", StringType(), False)
                                ])


In [0]:
aircraft_special_type_schema = StructType(fields=[StructField("Model", StringType(), False),
                                                  StructField("Type Designator", StringType(), False)
                                            ])

In [0]:
occurrences_df = spark.read.option("header", True) \
                                   .option("multiLine", True) \
                                   .option("delimiter", ";") \
                                   .option("nullValue" ,"null" ) \
                                   .schema(occurrences_schema) \
                                   .csv("abfss://bronze@ocorrenciasanacdl.dfs.core.windows.net/V_OCORRENCIA_AMPLA.csv").replace("", None)

In [0]:
aircraft_type_df = spark.read.option("header", True) \
                                   .option("delimiter", ",") \
                                   .schema(aircraft_type_schema) \
                                   .csv("abfss://bronze@ocorrenciasanacdl.dfs.core.windows.net/icao_aircraft_types.csv")

In [0]:
aircraft_special_type_df = spark.read.option("header", True) \
                                   .option("delimiter", ",") \
                                   .schema(aircraft_special_type_schema) \
                                   .csv("abfss://bronze@ocorrenciasanacdl.dfs.core.windows.net/"+"special_designators_type.csv")

### 2.Filter and rename columns 

In [0]:
from pyspark.sql.functions import col, date_format

In [0]:
occurrences_filtered_columns = occurrences_df.select(col("Numero_da_Ocorrencia").alias("occurrence_number"), 
                                                     col("Operador_Padronizado").alias("operator"), 
                                                     col("Classificacao_da_Ocorrencia").alias("occurrence_classification"),
                                                     col("Data_da_Ocorrencia").alias("occurrence_date"), 
                                                     col("Hora_da_Ocorrencia").alias("occurrence_time"),
                                                     col("Municipio").alias("city"),
                                                     col("UF").alias("state"),
                                                     col("Regiao").alias("region"),
                                                     col("Descricao_do_Tipo").alias("occurrence_description"),
                                                     col("ICAO").alias("icao_airport_code"),
                                                     col("Latitude").alias("latitude"),
                                                     col("Longitude").alias("longitude"),
                                                     col("Historico").alias("occurrence_details"),
                                                     col("Matricula").alias("aircraft_call_sign"),
                                                     col("Fase_da_Operacao").alias("operation_phase"),
                                                     col("Operacao").alias("operation"),
                                                     col("Danos_a_Aeronave").alias("aircraft_damage"),
                                                     col("Aerodromo_de_Destino").alias("arrival_airport"),
                                                     col("Aerodromo_de_Origem").alias("departure_airport"),
                                                     col("Lesoes_Fatais_Tripulantes").alias("casualties_crew"),
                                                     col("Lesoes_Fatais_Passageiros").alias("casualties_pax"),
                                                     col("Lesoes_Fatais_Terceiros").alias("casualties_ground"),
                                                     col("Lesoes_Graves_Tripulantes").alias("severe_inj_crew"),
                                                     col("Lesoes_Graves_Passageiros").alias("severe_inj_pax"),
                                                     col("Lesoes_Graves_Terceiros").alias("severe_inj_ground"),
                                                     col("Lesoes_Leves_Tripulantes").alias("minor_inj_crew"),
                                                     col("Lesoes_Leves_Passageiros").alias("minor_inj_pax"),
                                                     col("Lesoes_Leves_Terceiros").alias("minor_inj_ground"),
                                                     col("Ilesos_Tripulantes").alias("unharmed_crew"),
                                                     col("Ilesos_Passageiros").alias("unharmed_pax"),
                                                     col("Modelo").alias("acft_model"),
                                                     col("Tipo_ICAO").alias("acft_icao_type"),
                                                     col("PMD").alias("acft_max_gross_weight"),
                                                     col("Numero_de_Assentos").alias("acft_seats"),
                                                     col("Nome_do_Fabricante").alias("acft_manufacturer")
                                                    )

In [0]:
acft_type_filtered_df = aircraft_type_df.select(col("ModelFullName").alias("acft_icao_model"),
                                                col("Description").alias("acft_description"),
                                                col("WTC").alias("wake_turbulence_category"),
                                                col("WTG").alias("wake_turbulence_group"),
                                                col("Designator").alias("icao_aircraft_type"),
                                                col("ManufacturerCode").alias("manufacturer_code"),
                                                col("AircraftDescription").alias("acft_classification"),
                                                col("EngineCount").alias("number_of_engines"),
                                                col("EngineType").alias("engine_type"))

In [0]:
special_type_filtered_df = aircraft_special_type_df.select(col("Model").alias("acft_special_model_description"),
                                col("Type Designator").alias("acft_special_type"))

In [0]:
from pyspark.sql.functions import regexp_replace

In [0]:
special_type_filtered_df = special_type_filtered_df.withColumn(
    "acft_special_type",
    regexp_replace("acft_special_type", r"\d+$", "")
)

### 3. Null values handling

In [0]:
occurrences_nulls_handled = occurrences_filtered_columns.na.fill({
    "operator": "Desconhecido",
    "city": "Não informado",
    "icao_airport_code": "Indeterminado",
    "acft_model": "Modelo desconhecido",
    "acft_icao_type": "Tipo desconhecido",
    "occurrence_time": "00:00:00",
    "region": "Região desconhecida",
    "occurrence_description":"Não informado",
    "occurrence_details":"Detalhes não informado",
    "operation_phase":"Não informado",
    "aircraft_damage":"Não informado",
    "arrival_airport":"Não informado",
    "departure_airport":"Não informado",
    "operation":"Não informado",
    "acft_manufacturer":"Desconhecido",
    "aircraft_call_sign":"Não informado"
}).na.fill(0, subset=[
    "casualties_crew", "casualties_pax", "casualties_ground",
    "severe_inj_crew", "severe_inj_pax", "severe_inj_ground",
    "minor_inj_crew", "minor_inj_pax", "minor_inj_ground", "unharmed_crew", "unharmed_pax"
])

### 4. Duplicated rows check

In [0]:
occurrences_droped_df = drop_duplicates(occurrences_nulls_handled,["occurrence_number","occurrence_description"])


In [0]:
aircraft_type_filtered_dropped_df = drop_duplicates(acft_type_filtered_df,["engine_type"])

In [0]:
aircraft_special_type_dropped_df = drop_duplicates(special_type_filtered_df,["acft_special_type"])

### 5.Enrich some data and merge


In [0]:
from pyspark.sql.functions import year, hour

In [0]:
occurrences_year_partitioned = occurrences_droped_df.withColumn("occurrence_year", year("occurrence_date"))

In [0]:
occurrence_hour_extracted_df = occurrences_year_partitioned.withColumn("occurrence_hour", occurrences_year_partitioned.occurrence_time)

In [0]:
merge_condition = "tgt.occurrence_number == src.occurrence_number AND tgt.occurrence_description == src.occurrence_description"
merge_delta_data("anac_ocorrencias_dev","silver","processed_occurrences",occurrences_year_partitioned,merge_condition,["state", "occurrence_year"])

In [0]:
merge_condition = "tgt.acft_icao_model == src.acft_icao_model"
merge_delta_data("anac_ocorrencias_dev","silver","processed_aircraft_type",aircraft_type_filtered_dropped_df,merge_condition,["engine_type"])

In [0]:
merge_condition = "tgt.acft_special_model_description \
 == src.acft_special_model_description"
merge_delta_data("anac_ocorrencias_dev","silver","processed_aircraft_special_type",aircraft_special_type_dropped_df,merge_condition)