In [1]:
# Instalação dos pacotes necessários
!pip install pyspark
!pip install findspark



In [2]:
#importando o findSpark


import findspark
findspark.init()

In [3]:
## importando bibliotecas necessárias

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col,when,count,lit,greatest,struct,expr,length,trim, ltrim, rtrim,udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import numpy as np

In [None]:
### funções que irão auxiliar o tratamento de dados

def higher_than(col, condition):
    return (F.col(col) > condition )

def between(col, lower_condition, higher_condition):
    return ( F.col(col).between(lower_condition, higher_condition) )

def lower_than(col, condition):
    return (F.col(col) < condition )

def equal(col, condition):
    return (F.col(col) == condition )

In [None]:
# Criação do contexto do spark
sc = SparkContext()

# Instância do criador de sessão do spark
spark = (SparkSession.builder
                     .master("local[*]")
                     .appName("Aceleração PySpark - Transformation - Capgemini"))

##  Dataset airports.csv :

In [None]:
# Leitura/Carga do dataset

df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load("C:/Users/danisant/Codes/airports.csv"))

In [None]:
#Visões Temporárias - não altera a estrutura original

df_airports.createOrReplaceTempView('airports')

#### Pergunta 1

In [9]:
# Atendendo os requisitos


df_airports = df_airports.withColumn('alt', (
                 F.when(lower_than('alt', 0), 0 )
                 .otherwise(F.col('alt'))
))


+---+----+----+----+----+----+----+------+
|faa|name| lat| lon| alt|  tz| dst|qa_faa|
+---+----+----+----+----+----+----+------+
|  0| 04G|null|null|null|null|null|     F|
|  1| 06A|null|null|null|null|null|     F|
|  2| 06C|null|null|null|null|null|     F|
|  3| 06N|null|null|null|null|null|     F|
|  4| 09J|null|null|null|null|null|     F|
|  5| 0A9|null|null|null|null|null|     F|
|  6| 0G6|null|null|null|null|null|     F|
|  7| 0G7|null|null|null|null|null|     F|
|  8| 0P2|null|null|null|null|null|     F|
|  9| 0S9|null|null|null|null|null|     F|
| 10| 0W3|null|null|null|null|null|     F|
| 11| 10C|null|null|null|null|null|     F|
| 12| 17G|null|null|null|null|null|     F|
| 13| 19A|null|null|null|null|null|     F|
| 14| 1A3|null|null|null|null|null|     F|
| 15| 1B9|null|null|null|null|null|     F|
| 16| 1C9|null|null|null|null|null|     F|
| 17| 1CS|null|null|null|null|null|     F|
| 18| 1G3|null|null|null|null|null|     F|
| 19| 1OH|null|null|null|null|null|     F|
+---+----+-

In [None]:
### verificando a coluna alt após o tratamento realizado

df_airports.where(df_airports.alt < 0).show()

#### Pergunta 2

In [10]:
# Atendendo os requisitos

df_airports = df_airports.withColumn('dst', F.when(F.col('tz').between(-7,-5), 'A').otherwise(F.col('dst')))



+---+----+----+----+----+----+----+-------+
|faa|name| lat| lon| alt|  tz| dst|qa_name|
+---+----+----+----+----+----+----+-------+
|  0| 04G|null|null|null|null|null|   null|
|  1| 06A|null|null|null|null|null|   null|
|  2| 06C|null|null|null|null|null|   null|
|  3| 06N|null|null|null|null|null|   null|
|  4| 09J|null|null|null|null|null|   null|
|  5| 0A9|null|null|null|null|null|   null|
|  6| 0G6|null|null|null|null|null|   null|
|  7| 0G7|null|null|null|null|null|   null|
|  8| 0P2|null|null|null|null|null|   null|
|  9| 0S9|null|null|null|null|null|   null|
| 10| 0W3|null|null|null|null|null|   null|
| 11| 10C|null|null|null|null|null|   null|
| 12| 17G|null|null|null|null|null|   null|
| 13| 19A|null|null|null|null|null|   null|
| 14| 1A3|null|null|null|null|null|   null|
| 15| 1B9|null|null|null|null|null|   null|
| 16| 1C9|null|null|null|null|null|   null|
| 17| 1CS|null|null|null|null|null|   null|
| 18| 1G3|null|null|null|null|null|   null|
| 19| 1OH|null|null|null|null|nu

In [None]:
### verificando a coluna dst após tratamento

spark.getOrCreate().sql("select distinct(dst) from airports where tz between -7 and -5").show()

#### Pergunta 3

In [11]:
# Atendendo os requisitos


df_airports = df_airports.withColumn('dst',(
                    F.when(equal('dst', 'U'), 'A')
                    .otherwise(F.col('dst'))
))

+---+----+----+----+----+----+----+------+
|faa|name| lat| lon| alt|  tz| dst|qa_lat|
+---+----+----+----+----+----+----+------+
|  0| 04G|null|null|null|null|null|  null|
|  1| 06A|null|null|null|null|null|  null|
|  2| 06C|null|null|null|null|null|  null|
|  3| 06N|null|null|null|null|null|  null|
|  4| 09J|null|null|null|null|null|  null|
|  5| 0A9|null|null|null|null|null|  null|
|  6| 0G6|null|null|null|null|null|  null|
|  7| 0G7|null|null|null|null|null|  null|
|  8| 0P2|null|null|null|null|null|  null|
|  9| 0S9|null|null|null|null|null|  null|
| 10| 0W3|null|null|null|null|null|  null|
| 11| 10C|null|null|null|null|null|  null|
| 12| 17G|null|null|null|null|null|  null|
| 13| 19A|null|null|null|null|null|  null|
| 14| 1A3|null|null|null|null|null|  null|
| 15| 1B9|null|null|null|null|null|  null|
| 16| 1C9|null|null|null|null|null|  null|
| 17| 1CS|null|null|null|null|null|  null|
| 18| 1G3|null|null|null|null|null|  null|
| 19| 1OH|null|null|null|null|null|  null|
+---+----+-

In [None]:
### verificando a coluna dst após o tratamento realizado

df_airports.where(df_airports.dst == 'U').show()

#### Pergunta 4

In [12]:
# Atendendo os requisitos

df_airports = df_airports.withColumn('region', (
                    F.when(lower_than('lon', -124), 'ALASKA')
                     .when(higher_than('lon', -50) | lower_than('lat',  24), 'OFFSHORE')
                     .when(lower_than('lon', -95) | equal('lon', -95), 'MAINLAND-WEST')
                     .when(higher_than('lon', -95), 'MAINLAND-EAST')
                     .otherwise(None)
))
                                    
    

+---+----+----+----+----+----+----+------+
|faa|name| lat| lon| alt|  tz| dst|qa_lon|
+---+----+----+----+----+----+----+------+
|  0| 04G|null|null|null|null|null|  null|
|  1| 06A|null|null|null|null|null|  null|
|  2| 06C|null|null|null|null|null|  null|
|  3| 06N|null|null|null|null|null|  null|
|  4| 09J|null|null|null|null|null|  null|
|  5| 0A9|null|null|null|null|null|  null|
|  6| 0G6|null|null|null|null|null|  null|
|  7| 0G7|null|null|null|null|null|  null|
|  8| 0P2|null|null|null|null|null|  null|
|  9| 0S9|null|null|null|null|null|  null|
| 10| 0W3|null|null|null|null|null|  null|
| 11| 10C|null|null|null|null|null|  null|
| 12| 17G|null|null|null|null|null|  null|
| 13| 19A|null|null|null|null|null|  null|
| 14| 1A3|null|null|null|null|null|  null|
| 15| 1B9|null|null|null|null|null|  null|
| 16| 1C9|null|null|null|null|null|  null|
| 17| 1CS|null|null|null|null|null|  null|
| 18| 1G3|null|null|null|null|null|  null|
| 19| 1OH|null|null|null|null|null|  null|
+---+----+-

In [None]:
### verificando a coluna region após o tratamento


df_airports.where(df_airports.region =='ALASKA').show()

#### Pergunta 5

In [13]:
# Atendendo os requisitos

df_airports = df_airports.withColumn('type',(
                    F.when(F.col('name').rlike('Airport|Tradeport|Heliport|Airpor|Arpt' ), 'AP')
                     .when(F.col('name').rlike('Aerodrome'), 'AD')
                     .when(F.col('name').rlike('Airpark|Aero Park' ), 'AK')
                     .when(F.col('name').rlike('Station|Air Station'), 'AS')
                     .when(F.col('name').rlike('Field|Fld'), 'FL')
                     .otherwise(None)
))

+---+----+----+----+----+----+----+------+
|faa|name| lat| lon| alt|  tz| dst|qa_alt|
+---+----+----+----+----+----+----+------+
|  0| 04G|null|null|null|null|null|  null|
|  1| 06A|null|null|null|null|null|  null|
|  2| 06C|null|null|null|null|null|  null|
|  3| 06N|null|null|null|null|null|  null|
|  4| 09J|null|null|null|null|null|  null|
|  5| 0A9|null|null|null|null|null|  null|
|  6| 0G6|null|null|null|null|null|  null|
|  7| 0G7|null|null|null|null|null|  null|
|  8| 0P2|null|null|null|null|null|  null|
|  9| 0S9|null|null|null|null|null|  null|
| 10| 0W3|null|null|null|null|null|  null|
| 11| 10C|null|null|null|null|null|  null|
| 12| 17G|null|null|null|null|null|  null|
| 13| 19A|null|null|null|null|null|  null|
| 14| 1A3|null|null|null|null|null|  null|
| 15| 1B9|null|null|null|null|null|  null|
| 16| 1C9|null|null|null|null|null|  null|
| 17| 1CS|null|null|null|null|null|  null|
| 18| 1G3|null|null|null|null|null|  null|
| 19| 1OH|null|null|null|null|null|  null|
+---+----+-

In [None]:
### verificando a coluna type após o tratamento realizado

df_airports.where((F.col('type') != 'AP') &
              (F.col('type') != 'AD') &
              (F.col('type') != 'AK') &
              (F.col('type') != 'AS') &
              (F.col('type') == 'FL') 
             ).show()

#### Pergunta 6

In [14]:
### atendendo os requisitos

MILITARY = ["Base", "Aaf", "AFs", "Ahp", "Afb", "LRRS", "Lrrs", "Arb", "Naf", "NAS", "Nas", "Jrb", "Ns", "As", "Cgas", "Angb"]

REGEX_MILITARY = r'|'.join(map(lambda x : f".*({x}).*", MILITARY))

df_airports = df_airports.withColumn('military', (
                    F.when(F.col('name').rlike(REGEX_MILITARY), True)
                     .otherwise(False)))  

+---+----+----+----+----+----+----+-----+
|faa|name| lat| lon| alt|  tz| dst|qa_tz|
+---+----+----+----+----+----+----+-----+
|  0| 04G|null|null|null|null|null|    M|
|  1| 06A|null|null|null|null|null|    M|
|  2| 06C|null|null|null|null|null|    M|
|  3| 06N|null|null|null|null|null|    M|
|  4| 09J|null|null|null|null|null|    M|
|  5| 0A9|null|null|null|null|null|    M|
|  6| 0G6|null|null|null|null|null|    M|
|  7| 0G7|null|null|null|null|null|    M|
|  8| 0P2|null|null|null|null|null|    M|
|  9| 0S9|null|null|null|null|null|    M|
| 10| 0W3|null|null|null|null|null|    M|
| 11| 10C|null|null|null|null|null|    M|
| 12| 17G|null|null|null|null|null|    M|
| 13| 19A|null|null|null|null|null|    M|
| 14| 1A3|null|null|null|null|null|    M|
| 15| 1B9|null|null|null|null|null|    M|
| 16| 1C9|null|null|null|null|null|    M|
| 17| 1CS|null|null|null|null|null|    M|
| 18| 1G3|null|null|null|null|null|    M|
| 19| 1OH|null|null|null|null|null|    M|
+---+----+----+----+----+----+----

In [None]:
### verificando a coluna military pós tratamento

print(df_airports.where(F.col('military') == True).count())
print(df_airports.where(F.col('military') == False).count())

#### Pergunta 7

In [15]:
# Atendendo os requisitos


df_airports = df_airports.withColumn('administration', (
                    F.when(F.col('name').rlike('International|Intl|Intercontinental'), 'I')
                     .when(F.col('name').rlike('National|Natl'), 'N')
                     .when(F.col('name').rlike('Regional|Reigonal|Rgnl|County|Metro|Metropolitan'), 'R')
                     .when(F.col('name').rlike('Municipal|Muni|City'), 'M')
                     .otherwise(None)))  

+---+----+----+----+----+----+----+------+
|faa|name| lat| lon| alt|  tz| dst|qa_dst|
+---+----+----+----+----+----+----+------+
|  0| 04G|null|null|null|null|null|     M|
|  1| 06A|null|null|null|null|null|     M|
|  2| 06C|null|null|null|null|null|     M|
|  3| 06N|null|null|null|null|null|     M|
|  4| 09J|null|null|null|null|null|     M|
|  5| 0A9|null|null|null|null|null|     M|
|  6| 0G6|null|null|null|null|null|     M|
|  7| 0G7|null|null|null|null|null|     M|
|  8| 0P2|null|null|null|null|null|     M|
|  9| 0S9|null|null|null|null|null|     M|
| 10| 0W3|null|null|null|null|null|     M|
| 11| 10C|null|null|null|null|null|     M|
| 12| 17G|null|null|null|null|null|     M|
| 13| 19A|null|null|null|null|null|     M|
| 14| 1A3|null|null|null|null|null|     M|
| 15| 1B9|null|null|null|null|null|     M|
| 16| 1C9|null|null|null|null|null|     M|
| 17| 1CS|null|null|null|null|null|     M|
| 18| 1G3|null|null|null|null|null|     M|
| 19| 1OH|null|null|null|null|null|     M|
+---+----+-

In [None]:
### verificando a coluna administration pós tratamento

df_airports.where(df_airports.administration.isNull()).show(10)

### Gerando arquivo .parquet

In [16]:
(df_airports.repartition(1) 
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
.save("/data/workspace_files/airport_tf.parquet")
)

## Dataset planes.csv:

In [None]:
# Leitura/Carga do dataset


df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load("/data/workspace_files/planes.csv"))


In [None]:
#Visão Temporária - não altera a estrutura original


df_planes.createOrReplaceTempView('planes')

#### Pergunta 1

In [None]:
# Atendendo os requisitos



df_planes = df_planes.withColumn ('tailchar', 
               F.regexp_replace(F.col("tailnum"), '[0-9]|^N', "")
                
                    )

In [None]:
 #leitura do dataframe tratado


df_planes.show()

#### Pergunta 2

In [None]:
# Atendendo os requisitos

## - comentar linha desejada para testagem em seperado

df_planes = df_planes.withColumn('year', 
                   F.when(F.col('year') ==0, 1996).otherwise(F.col('year'))
)


In [None]:
#leitura do dataframe tratado

df_planes.filter(df_planes["year"]==1996
                 ).show(5)

#### Pergunta 3

In [None]:
#Atendendo aos requisitos
#criando df auxiliar com o ano mais recente para cada manufacturer e model

df_aux1 = df_planes.select('manufacturer', 'model', 'year').groupBy('manufacturer', 'model').agg(F.min('year').alias('aux_year_1')).orderBy(F.col('manufacturer'), F.col('model'))

df_aux2 = df_planes.select('manufacturer', 'year').groupBy('manufacturer').agg(F.min('year').alias('aux_year_2')).orderBy(F.col('manufacturer'))

df_aux1 = df_aux1.withColumnRenamed('manufacturer', 'manufacturer_aux1').withColumnRenamed('model', 'model_aux1')

df_aux2 = df_aux2.withColumnRenamed('manufacturer', 'manufacturer_aux2')

condition = [df_planes.model == df_aux1.model_aux1, df_planes.manufacturer == df_aux1.manufacturer_aux1]

df_aux3 = df_planes.join(df_aux1, condition, 'left').withColumnRenamed('aux_year_1', 'aux_year_first_condition')

df_aux3 = df_aux3.drop('manufacturer_aux1', 'model_aux1')
  
condition1 = [df_aux3.manufacturer == df_aux2.manufacturer_aux2]

df_aux4 = df_aux3.join(df_aux2, condition1, 'left').withColumnRenamed('aux_year_2', 'aux_year_second_condition')

df_aux4 = df_aux4.drop('manufacturer_aux2') 

In [None]:
### seguindo com o tratamento e verificando se as condições foram atendidas

df = df_aux4.withColumn('year', F.when(F.col('year').isNull(), F.col('aux_year_first_condition'))
                                 .otherwise(F.col('aux_year_first_condition'))
                       )

df = df_aux4.withColumn('year', F.when(F.col('aux_year_first_condition').isNull(), F.col('aux_year_second_condition'))
                                 .otherwise(F.col('aux_year_first_condition'))
                       )
                               
df = df.select('year', 'manufacturer', 'model', 'aux_year_first_condition', 'aux_year_second_condition')

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select * from planes where year is Null").show()

#### Pergunta 4

In [None]:
## Atendendo os requisitos

df_planes = df_planes.withColumn('age', (F.year(F.current_date()))-F.col('year'))

### exibindo a coluna age de acordo com o tratamento realizado

df_planes.select("age").distinct().show(5)

#### Pergunta 5

In [None]:
# Atendendo os requisitos



df_planes = df_planes.withColumn("type",
                                when(df_planes["type"]=="Fixed wing multi engine", "MULTI_ENG")
                                .when(df_planes["type"]=="Fixed wing single engine", "SINGLE_ENG")
                                .when(df_planes["type"]=="Rotorcraft", "ROTORCRAFT")


)
df_planes.show(50)

In [None]:
### exibindo a coluna type de acordo com o tratamento realizado


df_planes.filter(df_planes['type']!= 'MULTI_ENG').show()

#### Pergunta 6

In [None]:
# Atendendo os requisitos

df_planes = df_planes.withColumn('manufacturer', F.when(F.col('manufacturer') == 'AIRBUS INDUSTRIE', 'AIRBUS')
                                           .when(F.col('manufacturer') == 'BOMBARDIER INC', 'BOMBARDIER') 
                                           .when(F.col('manufacturer').rlike('MCDONNELL DOUGLAS...'), 'MCDONNELL DOUGLAS')
                                           .when(F.col('manufacturer') == 'CIRRUS DESIGN CORP', 'CIRRUS')
                                           .when(F.col('manufacturer') == 'BARKER JACK L', 'BARKER JACK')
                                           .when(F.col('manufacturer').rlike('ROBINSON HELICOPT...'), 'ROBINSON HELICOPTER')
                                           .when(F.col('manufacturer') == 'GULFSTREAM AEROSPACE', 'GULFSTREAM')
                                           .otherwise(F.col('manufacturer'))
                         )


In [None]:
### Consultando a coluna manufacturer de acordo com o tratamento realizado

spark.getOrCreate().sql("select distinct(manufacturer) from planes order by manufacturer").show()

#### Pergunta 7

In [None]:
# Atendendo os requisitos

df_planes = df_planes.withColumn('model',(
                    F.when( df_planes.model.contains("("), F.regexp_replace(df_planes.model, '\([^()]*\)', '' ))
                    .otherwise(df_planes.model)
))
df_planes = df_planes.withColumn('model',(
            F.trim(df_planes.model)
))



In [None]:
### verificando a coluna model de acordo com o tratamento realizado


df_planes.select('model').distinct().sort('model').show(5,truncate = False)

#### Pergunta 8

In [None]:
### atendendo aos requisitos

df_planes = df_planes.withColumn('speed',
                              F.when(F.col('speed').isNull(), F.ceil(F.col('seats') / 0.36)))

In [None]:
### atendendo aos requisitos

df_planes = df_planes.withColumn('speed',
                              F.when(F.col('speed').isNull(), F.ceil(F.col('seats') / 0.36)))

#### Pergunta 9

In [None]:
### atendendo aos requisitos

df_planes = df_planes.withColumn('engine_split',(
                 F.split(df_planes.engine, '-')[1]
))

df_planes.select('engine_split').distinct().show()

In [None]:
df_planes = df_planes.withColumn('engine_type',(
                    F.when(df_planes.engine_split == 'jet', 'JET')
                    .when(df_planes.engine_split == 'fan', 'FAN')
                    .when(df_planes.engine_split == 'prop', 'PROP')
                    .when(df_planes.engine_split == 'shaft', 'SHAFT')
                    .when(df_planes.engine.endswith('Cycle'), 'CYCLE')
                    .otherwise(df_planes.engine)
))

df_planes = df_planes.drop('engine_split')

In [None]:
### verificando a nova coluna engine_type de acordo com o tratamento realizado


df_planes.select('engine', 'engine_type').distinct().show()

### Gerando arquivo .parquet

In [None]:
(df_planes.repartition(1) 
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
.save("/data/workspace_files/planes_tf.parquet")
)

## Dataset flights.csv :

In [None]:
# Leitura/Carga do dataset


df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load("C:/Users/danisant/Codes/flights.csv"))

In [None]:
#Gerar a tabela de acordo com sua estrutura de colunas

df_flights = df_flights[['year', 'month', 'day','hour', 'minute','dep_time','arr_time', 'dep_delay', 'arr_delay', 'carrier', 'tailnum', 'flight',
        'origin','dest', 'air_time', 'distance']]

In [None]:
#Visões Temporárias - não altera a estrutura original


df_flights.createOrReplaceTempView('flights')

#### Pergunta 1:

In [None]:
# Verificando se há e a quantidade de valores nulos nas colunas solicitadas:

df_flights.groupBy().agg(F.count(F.when(((F.col("hour").isNull()) | (F.col("minute").isNull())),True))).show()


In [None]:
# Atendendo os requisitos



df_flights = df_flights.withColumn('hour', when(F.col('hour').isNull(), "0")
                                    .otherwise(F.col('hour'))
                          ) \
               .withColumn('minute', F.when(F.col('minute').isNull(), "0")
                                      .otherwise(F.col('minute'))
                          )

In [None]:
# Confirmando se há e a quantidade de valores nulos nas colunas solicitadas:

df_flights.groupBy().agg(F.count(F.when(((F.col("hour").isNull()) | (F.col("minute").isNull())),True))).show()

#### Pergunta 2:

In [None]:
# Verificando se há e a quantidade de valores iguais a 24 nas coluna solicitada:


df_flights.select('hour').where((df_flights.hour==24)).count()

In [None]:
# Atendendo os requisitos


df_flights = df_flights.withColumn('hour', F.when(F.col('hour') == 24 , 0)
                                    .otherwise(F.col('hour'))  
                          )

In [None]:
# Confirmando se há e a quantidade de valores iguais a 24 nas coluna solicitada:


df_flights.select('hour').where((df_flights.hour==24)).count()


#### Pergunta 3:

In [None]:
# Atendendo aos requisitos


df_flights = df_flights.withColumn('dep_datetime', 
F.concat_ws(':',
            F.concat_ws(' ',
                        F.concat_ws('-',F.col('year'), F.col('month'), F.col('day')),
            F.col('hour')), 
F.col('minute')).cast('timestamp'))

In [None]:
# Confirmar se a coluna foi criada e se está formatada de acordo com a necessidade solicitada

df_flights.select('dep_datetime').show() # somente a nova coluna
df_flights.select('*').show()  # todo o dataset já com a nova coluna

#### Pergunta 4:

In [None]:
## Verificando se há e quantos valores nulos existem nas colunas solicitadas

df_flights.select('dep_time').where((df_flights.dep_time.isNull())).count()

In [None]:
# Atendendo os requisitos

df_flights = (df_flights.withColumn('dep_time',
F.when(F.col('dep_time') == 'NA',
F.concat(F.col('hour'),
F.lpad(F.col('minute'), 2, '0')))
.otherwise(F.col('dep_time').cast('timestamp'))
))

#### Pergunta 5:

In [None]:
## Verificando se há e quantos valores nulos existem nas colunas solicitadas


df_flights.select('dep_delay').where((df_flights.dep_delay.isNull())).count()

In [None]:
# Atendendo aos requisitos


df_flights = df_flights.withColumn('dep_delay', F.when(F.col('dep_delay').isNull() , 0)
                                    .otherwise(F.col('dep_delay'))  
                          )



In [None]:
## Confirmando se há e quantos valores nulos existem nas colunas solicitadas


df_flights.select('dep_delay').where((df_flights.dep_delay.isNull())).count()

#### Pergunta 6:

In [None]:
## Verificando se há e quantos valores nulos existem nas colunas solicitadas


df_flights.select('arr_delay').where((df_flights.arr_delay.isNull())).count()

In [None]:
# Atendendo os requisistos

df_flights = df_flights.withColumn('arr_delay', F.when(F.col('arr_delay').isNull() , 0)
                                    .otherwise(F.col('arr_delay'))  
                          )



In [None]:
## Confirmando se há e quantos valores nulos existem nas colunas solicitadas


df_flights.select('arr_delay').where((df_flights.arr_delay.isNull())).count()

#### Pergunta 7:

In [None]:
### Verificando se as colunas existem no dataset

listColumns = df_flights.columns
"year","month","day","hour","minute"  in listColumns

In [None]:
### Atendendo aos requisitos

columns_to_drop = ['year','month','day','hour','minute']

df_flights = df_flights.drop(*columns_to_drop)

In [None]:
### Verificando se as colunas foram excluidas 

listColumns = df_flights.columns
"year","month","day","hour","minute"  in listColumns


In [None]:
### Exibindo o dataset sem as colunas do requisito anterior

df_flights.show()

#### Pergunta 8:

In [None]:
### Atendendo os requisitos


df_flights = df_flights.withColumn('air_time_projected', ((F.col('distance') * 0.1) + 20).cast(IntegerType()))

In [None]:
### Verificando se a coluna foi criada com o tipo solicitado

df_flights.select(col("air_time_projected")).schema

In [None]:
### Exibindo o dataset com a nova coluna criada

df_flights.show(5)

#### Pergunta 9:

In [None]:
### Atendendo os requisitos

## criando coluna auxiliar
dfaux = df_flights.groupBy('origin', 'dest').avg('air_time').withColumnRenamed('avg(air_time)','air_time_expected').withColumnRenamed('origin', 'origin1').withColumnRenamed('dest', 'dest1')

dfaux.show(3)

In [None]:
### criando a condição necessária para atender o requisito solicitado
### unindo tabela auxiliar à condição e em seguida excluindo-a
### gerando nova coluna atendendo o requisito necessário e também com o tipo necessário

condition = [df_flights.origin == dfaux.origin1, df_flights.dest == dfaux.dest1]

df_flights = df_flights.join(dfaux, condition, 'left')

df_flights = df_flights.drop('origin1', 'dest1')

df_flights = df_flights.withColumn('air_time_expected', F.col('air_time_expected').cast(IntegerType()))

In [None]:
### Exibindo a coluna com o tratamento realizado


df_flights.select('origin','dest','air_time_expected').show(20)


#### Pergunta 10:

In [None]:
### Atendendo aos requisitos

df_flights = (df_flights.withColumn('air_time',
F.when(F.col('air_time').isNull(),
F.greatest(F.col('air_time_projected'), F.col('air_time_expected')))
.otherwise(F.col('air_time'))

))

df_flights.show(5)

#### Pergunta 11:

In [None]:
### Verificando se há e quantos valores existem na coluna arr_time

df_flights.select('arr_time').where((df_flights.arr_time.isNull())).count()

In [None]:
## Atendendo os requisitos

df_flights = df_flights.withColumn('arr_time', F.when(F.col('arr_time') == 'NA', F.col('dep_time')+F.col('air_time'))
                                                .otherwise(F.col('arr_time'))  
                                  )

In [None]:
### Confirmando o tratamento realizado na coluna arr_time


df_flights.where(df_flights.arr_time> 60).show(5)

In [None]:
### Confirmando se há valores nulos na coluna arr_time

df_flights.select('arr_time').where((df_flights.arr_time == 'NA')).show()

#### Pergunta 12:

In [None]:
### Atendendo os requisitos

df_flights = df_flights.withColumn('haul_duration', F.when(F.col('air_time').between(20, 180), 'SHORT-HAUL')
                                                     .when(F.col('air_time').between(180, 300), 'MEDIUM-HAUL')
                                                     .when(F.col('air_time')>300 , 'LONG-HAUL') 
                                  )



In [None]:
### Verificando o tratamento na nova coluna

df_flights.select("haul_duration", "air_time").show(10)

#### Pergunta 13:

In [None]:
### Atendendo os requisitos

df_flights = df_flights.withColumn('dep_season',
                                    F.when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-03-20 15:33:00')),F.concat(F.year('dep_datetime'),F.lit('-06-21 10:14:00'))), 'SPRING')
                                    .when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-06-21 10:14:00')),F.concat(F.year('dep_datetime'),F.lit('-09-23 02:04:00'))), 'SUMMER')
                                    .when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-09-23 02:04:00')),F.concat(F.year('dep_datetime'),F.lit('-12-21 21:48:00'))), 'FALL')
                                    .otherwise('WINTER')
                                  )

In [None]:
### Consultando a coluna dep_datetime de acordo com o tratamento anterior

df_flights.select('dep_datetime','dep_season').filter(df_flights.dep_season != 'SPRING').show()

#### Pergunta 14:

In [None]:
### Atendendo os requisitos

df_flights = df_flights.withColumn('dep_delay_category', F.when(F.col('dep_delay')<0, 'ANTECIPATED')
                                                          .when(F.col('dep_delay')==0, 'INTIME')
                                                          .when(F.col('dep_delay').between(0,60), 'MINOR')
                                                          .when(F.col('dep_delay')>60, 'MAJOR')
                                  )

In [None]:
### Exibindo a nova coluna tratada

df_flights.select('dep_delay', 'dep_delay_category').show()

### Gerando arquivo .parquet

In [None]:
(df_flights.repartition(1) 
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
.save("/data/workspace_files/flights_tf.parquet")
)