### Qualidade
Antes de realizarmos transformações nos dados, é importante estabelecermos processos que apontem
erros de qualidade nos dados que estamos trabalhado, dessa forma é possível ter clareza das
inconsistências comuns e assim criar formas de melhorar a qualidade dos dados. Pensando nisso, a
primeira atividade planejada é criarmos uma coluna adicional reportando o tipo de inconsistência que
encontramos nos datasets.

### Dicionário
* <b>faa (string):</b> Identificador do aeroporto determinado pela Federal Aviation Administration. Formato: 3-
5 caracteres alfanuméricos.<br>
* <b> name (string)</b>: Nome do aeroporto.<br>
* <b> lat (float):</b> Latitude do aeroporto. Intervalo de valores [−180, 180]. <br>
* <b> lon (float):</b> Longitude do aeroporto Intervalo de valores . [−180, 180] <br>
* <b> alt (int):</b> Altitude do aeroporto. Unidade de medida em pés. Intervalo de valores [0,+∞) . <br>
* <b> tz (float):</b> Fuso horário baseado no deslocamento de horas a partir de UTC/GMT. Intervalo de valores [−11,+14] 
. Pode ser fuso fracionário [1] <br>
* <b> dst (category):</b> Horário de verão. Descrição dos possíveis valores [2]: <br>
*  E (Europe)
*  A (US/Canada)
*  S (South America) 
*  O (Australia) 
*  Z (New Zealand)
*  N (None) 
*  U (Unknown) 

In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark



In [2]:
# Locate spark
import findspark
findspark.init()

In [3]:
# Importando pacotes
import re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F  # Importando todas as funções 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType,BooleanType

In [43]:
# Expressoes regulares comuns
# Boas práticas (variáveis constantes em maiusculo)
REGEX_ALPHA    = r'[a-zA-Z]+'
REGEX_INTEGER  = r'[0-9]+'
REGEX_FLOAT    = r'[0-9]+\.[0-9]+'
REGEX_ALPHANUM = r'[0-9a-zA-Z]+'
REGEX_EMPTY_STR= r'[\t ]+$'
REGEX_SPECIAL  = r'[!@#$%&*\(\)_]+'
REGEX_NNUMBER  = r'^N[1-9][0-9]{2,3}([ABCDEFGHJKLMNPRSTUVXWYZ]{1,2})'
REGEX_NNUMBER_INVALID = r'(N0.*$)|(.*[IO].*)'
REGEX_TIME_FMT = r'^(([0-1]?[0-9])|(2[0-3]))([0-5][0-9])$'
REGEX_NNUMBER_NEGATIVE = r'^-\d*\.?\d+$'
REGEX_PARENTESE = r'[(\)]+'

In [37]:
# Funcoes auxiliares 
def split_csv(line):
    return tuple(map(lambda x: x.replace('"',''), line.split(",")))

def check_empty_column(col):
    return (F.col(col).isNull() | (F.col(col) =='') | F.col(col).rlike(REGEX_EMPTY_STR))

def create_regex_from_list(_list):
    return r'|'.join(map(lambda x : f".*({x}).*", _list))


def format_year_date(col):
    return (F.date_format(F.current_date(),"yyyy") - col).cast('int')

In [6]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("Aceleração PySpark - Capgemini"))

In [7]:
# Criando Structure (Schema)

schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True)
])

schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
])

In [8]:
# Lendo Datasets 
df_airports = (spark.getOrCreate().read
                   .format("csv")
                   .option("header","True")
                   .schema(schema_airports)
                   .load("../../pyspark-capgemini/data/airports.csv"))


df_planes = (spark.getOrCreate().read
                   .format("csv")
                   .option("header","True")
                   .schema(schema_planes)
                   .load("../../pyspark-capgemini/data/planes.csv"))


df_flights = (spark.getOrCreate().read
                   .format("csv")
                   .option("header","True")
                   .schema(schema_flights)
                   .load("../../pyspark-capgemini/data/flights.csv"))

In [9]:
df_airports.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



In [10]:
# Criacao das visões temporarias
df_airports.createOrReplaceTempView('airports')

In [11]:
# Leitura dos dados como RDD
rdd_airports = sc.textFile("../../pyspark-capgemini/data/airports.csv")

In [12]:
# armazena a primeira linha (colunas) do arquivo como referencia.
header_airports = rdd_airports.first()

# Remove a primeira linha (colunas) do rdd
rdd_airports = rdd_airports.filter(lambda line: line != header_airports).map(split_csv)


### Criando função para adicionar linhas e testar o código. 

In [15]:
def add_test_rows_for_airports(data,_format): #convenção para usar _ para diferenciar de nomes reservados
        # Adiciona linhas para testar regras de negócio
        values = [
        #faa    name  lat       lon               alt         tz            dst
        (0 ,     0,    0    ,   0            ,    '-20'     ,  0,             0),
        (''   , ''  , ''      , ''              , ''        ,   ''  ,        ''),
        ('   ', None, '12O.12', '-80.Aa6195833' , '-50'  ,   '5',          'U'),
        ('AAA', None, '12O.12', '-80.Aa6195833' , '-50'  ,   '4',         'U' ),
        ('222', None, None    , '-80.Aa6195833' , '-100'    ,        '-14', 'U'),
        ('__!', None, None    , '-80.Aa6195833' , '-100'    ,        '-14', 'U')
    ]
        if _format =="df":
            return spark.getOrCreate().createDataFrame(values,data.columns).union(data)
        elif _format == "rdd":
            return sc.parallelize(tuple(values)).union(data)
        
        
    
    
    

In [16]:
# Adicionando linhas ao Dataframe
df_airports  = add_test_rows_for_airports(df_airports,  "df")

TypeError: field faa: Can not merge type <class 'pyspark.sql.types.LongType'> and <class 'pyspark.sql.types.StringType'>

In [17]:

rdd_airports.take(10)

[('04G', 'Lansdowne Airport', '41.1304722', '-80.6195833', '1044', '-5', 'A'),
 ('06A',
  'Moton Field Municipal Airport',
  '32.4605722',
  '-85.6800278',
  '264',
  '-5',
  'A'),
 ('06C', 'Schaumburg Regional', '41.9893408', '-88.1012428', '801', '-6', 'A'),
 ('06N', 'Randall Airport', '41.431912', '-74.3915611', '523', '-5', 'A'),
 ('09J',
  'Jekyll Island Airport',
  '31.0744722',
  '-81.4277778',
  '11',
  '-4',
  'A'),
 ('0A9',
  'Elizabethton Municipal Airport',
  '36.3712222',
  '-82.1734167',
  '1593',
  '-4',
  'A'),
 ('0G6',
  'Williams County Airport',
  '41.4673056',
  '-84.5067778',
  '730',
  '-5',
  'A'),
 ('0G7',
  'Finger Lakes Regional Airport',
  '42.8835647',
  '-76.7812318',
  '492',
  '-5',
  'A'),
 ('0P2',
  'Shoestring Aviation Airfield',
  '39.7948244',
  '-76.6471914',
  '1000',
  '-5',
  'U'),
 ('0S9',
  'Jefferson County Intl',
  '48.0538086',
  '-122.8106436',
  '108',
  '-8',
  'A')]

# Dataset airports

### 1 pergunta<br>


In [14]:
# Usando Dataframe

df_airports.filter(F.col('alt') < 0 ).show()

df_airports = df_airports.withColumn('alt',
                                      F.when(F.col('alt') < 0 , 0)
                                       .otherwise(F.col('alt'))
                                    )
                                                                                 

df_airports.filter(F.col('alt') < 0 ).show()


+---+-------------+---------+----------+---+---+---+
|faa|         name|      lat|       lon|alt| tz|dst|
+---+-------------+---------+----------+---+---+---+
|IPL|  Imperial Co| 32.83422|-115.57874|-54| -8|  A|
|NJK|El Centro Naf|32.829224|-115.67167|-42| -8|  A|
+---+-------------+---------+----------+---+---+---+

+---+----+---+---+---+---+---+
|faa|name|lat|lon|alt| tz|dst|
+---+----+---+---+---+---+---+
+---+----+---+---+---+---+---+



In [15]:
df_airports.select('alt').distinct().toPandas()

Unnamed: 0,alt
0,833
1,1580
2,148
3,463
4,243
...,...
882,89
883,5431
884,4173
885,401


In [None]:
### Usando SQL


In [None]:
### Usando RDD 


### 2 pergunta


In [16]:
Horario_verao ={
    '(Europe)': "E",
    '(US/Canada)':"A", 
    '(South America)':"S", 
    '(Australia)':"O", 
    '(New Zealand)':"Z", 
    '(None)': "N", 
    '(Unknown)':'U'}

In [17]:
print(Horario_verao)

{'(Europe)': 'E', '(US/Canada)': 'A', '(South America)': 'S', '(Australia)': 'O', '(New Zealand)': 'Z', '(None)': 'N', '(Unknown)': 'U'}


In [18]:
# Usando Dataframe

df_airports.filter(F.col('tz').between(-7,-5)).show()

df_airports = df_airports.withColumn('dst',
                                     F.when(F.col('tz').between(-7,-5),Horario_verao['(US/Canada)'])
                                     .otherwise(F.col('dst'))
                                    )

+---+--------------------+---------+----------+----+---+---+
|faa|                name|      lat|       lon| alt| tz|dst|
+---+--------------------+---------+----------+----+---+---+
|04G|   Lansdowne Airport|41.130474| -80.61958|1044| -5|  A|
|06A|Moton Field Munic...| 32.46057| -85.68003| 264| -5|  A|
|06C| Schaumburg Regional| 41.98934| -88.10124| 801| -6|  A|
|06N|     Randall Airport| 41.43191| -74.39156| 523| -5|  A|
|0G6|Williams County A...|41.467304|-84.506775| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.883564|-76.781235| 492| -5|  A|
|0P2|Shoestring Aviati...|39.794823|-76.647194|1000| -5|  U|
|0W3|Harford County Ai...|39.566837|  -76.2024| 409| -5|  A|
|10C|  Galt Field Airport| 42.40289|-88.375114| 875| -6|  U|
|17G|Port Bucyrus-Craw...|40.781555| -82.97481|1003| -5|  A|
|1B9| Mansfield Municipal|42.000134| -71.19677| 122| -5|  A|
|1CS|Clow Internationa...|41.695976|-88.129234| 670| -6|  U|
|1OH|     Fortman Airport|40.555325| -84.38662| 885| -5|  U|
|1RL|Point Roberts Air..

In [None]:
### Usando SQL

In [None]:
### Usando RDD 


### 3 pergunta 

In [19]:
# Usando Dataframe
df_airports.filter(F.col('dst') == 'U').show()

df_airports = df_airports.withColumn('dst',
                                     F.when(F.col('dst') =='U', Horario_verao['(US/Canada)'])
                                     .otherwise(F.col('dst')))
                                     

+---+--------------------+---------+----------+----+---+---+
|faa|                name|      lat|       lon| alt| tz|dst|
+---+--------------------+---------+----------+----+---+---+
|19A|Jackson County Ai...|34.175865|  -83.5616| 951| -4|  U|
|BLD|Boulder City Muni...|  35.5651|  -114.514|2201| -8|  U|
|GCW|Grand Canyon West...|  35.5925| -113.4859|4825| -8|  U|
|MXY|    McCarthy Airport| 61.43706|-142.90308|1531| -8|  U|
|NGZ|         NAS Alameda|  37.7861| -122.3186|  10| -9|  U|
|SWD|      Seward Airport|60.126938|-149.41881|  22| -8|  U|
|SXQ|    Soldotna Airport|60.474957|-151.03824| 113| -8|  U|
|WWT|      Newtok Airport|60.939167|-164.64111|  25|-10|  U|
+---+--------------------+---------+----------+----+---+---+



In [20]:
df_airports.where(F.col('dst') == "U").show()

+---+----+---+---+---+---+---+
|faa|name|lat|lon|alt| tz|dst|
+---+----+---+---+---+---+---+
+---+----+---+---+---+---+---+



In [None]:
### Usando SQL

In [None]:
### Usando RDD 

### 4.pergunta

In [21]:
region = ['EUA', 'ALASKA', 'OFFSHORE','MAINLAND-WEST','MAINLAND-EAST']

In [22]:
# Usando Dataframe

df_airports = df_airports.withColumn("region", 
                                    F.when(F.col("lon") < -124, 'ALASKA')
                                     .when((F.col("lon") > -50) | (F.col("lat") < 24),'OFFSHORE')
                                     .when( F.col("lon").between(-124,-95), 'MAINLAND-WEST')
                                     .when( F.col("lon").between(-95, -50), 'MAINLAND-EAST')
                                     .otherwise('NaN'))

df_airports.groupBy('region').count().show()

+-------------+-----+
|       region|count|
+-------------+-----+
|       ALASKA|  261|
|     OFFSHORE|    4|
|MAINLAND-EAST|  696|
|MAINLAND-WEST|  436|
+-------------+-----+



In [None]:
### Usando SQL

In [None]:
### Usando RDD

###  5. pergunta


In [23]:
# Usando dataframe

REGEX_AP_CATEGORY = '.*Airport*|.*Tradeport*|.*Heliport*|.*Airpor*|.*Arpt*'
REGEX_AD_CATEGORY = '.*Aerodrome*'
REGEX_AK_CATEGORY = '.*Airpark*|.*Aero Park*'
REGEX_AS_CATEGORY = '.*Station*|.*Air Station*'
REGEX_FL_CATEGORY = '.*Field*|.*Fld*'

df_airports = df_airports.withColumn('type', (
                                        F.when(F.col('name').rlike(REGEX_AP_CATEGORY), 'AP')
                                         .when(F.col('name').rlike(REGEX_AD_CATEGORY), 'AD')
                                         .when(F.col('name').rlike(REGEX_AK_CATEGORY), 'AK')
                                         .when(F.col('name').rlike(REGEX_AS_CATEGORY), 'AS')
                                         .when(F.col('name').rlike(REGEX_FL_CATEGORY), 'FL')
                                        ).otherwise('NaN'))

df_airports.groupBy('type').count().show()
df_airports.filter(F.col('type') == 'NaN').show()




+----+-----+
|type|count|
+----+-----+
|  AD|    1|
| NaN|  657|
|  AS|   19|
|  FL|   84|
|  AK|   12|
|  AP|  624|
+----+-----+

+---+--------------------+---------+-----------+----+---+---+-------------+----+
|faa|                name|      lat|        lon| alt| tz|dst|       region|type|
+---+--------------------+---------+-----------+----+---+---+-------------+----+
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|MAINLAND-EAST| NaN|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  A|MAINLAND-EAST| NaN|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|MAINLAND-WEST| NaN|
|1B9| Mansfield Municipal|42.000134|  -71.19677| 122| -5|  A|MAINLAND-EAST| NaN|
|3G3| Wadsworth Municipal| 41.00316|  -81.75644| 974| -5|  A|MAINLAND-EAST| NaN|
|49X|   Chemehuevi Valley| 34.52889| -114.43197| 638| -8|  A|MAINLAND-WEST| NaN|
|ABE|  Lehigh Valley Intl|40.652084|   -75.4408| 393| -5|  A|MAINLAND-EAST| NaN|
|ABI|        Abilene Rgnl| 32.41132|   -99.6819|1791| -6|  

In [29]:
### Usando DataFrame




                                     
                                         


In [31]:
df_airports.where(F.col('type (category)')=='AP').show()

+---+--------------------+---------+----------+----+---+----+---------------+
|faa|                name|      lat|       lon| alt| tz| dst|type (category)|
+---+--------------------+---------+----------+----+---+----+---------------+
|04G|   Lansdowne Airport|41.130474| -80.61958|null| -5|   A|             AP|
|06A|Moton Field Munic...| 32.46057| -85.68003|null| -5|   A|             AP|
|06N|     Randall Airport| 41.43191| -74.39156|null| -5|   A|             AP|
|09J|Jekyll Island Air...|31.074472| -81.42778|null| -4|   A|             AP|
|0A9|Elizabethton Muni...|36.371223|-82.173416|null| -4|   A|             AP|
|0G6|Williams County A...|41.467304|-84.506775|null| -5|   A|             AP|
|0G7|Finger Lakes Regi...|42.883564|-76.781235|null| -5|   A|             AP|
|0W3|Harford County Ai...|39.566837|  -76.2024|null| -5|   A|             AP|
|10C|  Galt Field Airport| 42.40289|-88.375114|null| -6|null|             AP|
|17G|Port Bucyrus-Craw...|40.781555| -82.97481|null| -5|   A|   

In [None]:
### Usando SQL

In [None]:
### Usando RDD

### 6.pergunta


In [24]:
LIST_TRUE = ["Base", "Aaf", "AFs", "Ahp", "Afb", "LRRS", "Lrrs", "Arb",
             "Naf", "NAS", "Nas", "Jrb", "Ns", "As", "Cgas", "Angb"]

## TRUE_REGEX = r'|'.join(map(lambda x : f".*({x}).*", LIST_TRUE))


In [25]:
df_airports = df_airports.withColumn('military',
                                    F.when(F.col('name').rlike(create_regex_from_list(LIST_TRUE)), True)
                                    .otherwise(False))

df_airports.groupBy('military').count().show()


+--------+-----+
|military|count|
+--------+-----+
|    true|  166|
|   false| 1231|
+--------+-----+



In [None]:
### Usando DataFrame

In [None]:
### Usando SQL

In [None]:
### Usando RDD

### 7 pergunta


In [26]:
#lists
I_TYPES = ["International", "Intl", "Intercontinental"]
N_TYPES = ["National", "Natl"]
R_TYPES = ["Regional", "Reigonal", "Rgnl", "County", "Metro" "Metropolitan"]
M_TYPES = ["Municipal", "Muni", 'City']

# Usando dataframe
df_airports = df_airports.withColumn('administration',(
        F.when(
            F.col('name').rlike(create_regex_from_list(I_TYPES)), 'I'
        ).when(
            F.col('name').rlike(create_regex_from_list(N_TYPES)), 'N'
        ).when(
            F.col('name').rlike(create_regex_from_list(R_TYPES)), 'R'
        ).when(
            F.col('name').rlike(create_regex_from_list(M_TYPES)), 'M'
        ).otherwise('NaN')
    ))

In [27]:
df_airports.filter(F.col('administration') =='I').show()

+---+--------------------+---------+-----------+----+---+---+-------------+----+--------+--------------+
|faa|                name|      lat|        lon| alt| tz|dst|       region|type|military|administration|
+---+--------------------+---------+-----------+----+---+---+-------------+----+--------+--------------+
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|MAINLAND-WEST| NaN|   false|             I|
|1CS|Clow Internationa...|41.695976| -88.129234| 670| -6|  A|MAINLAND-EAST|  AP|   false|             I|
|ABE|  Lehigh Valley Intl|40.652084|   -75.4408| 393| -5|  A|MAINLAND-EAST| NaN|   false|             I|
|ABQ|Albuquerque Inter...|35.040222| -106.60919|5355| -7|  A|MAINLAND-WEST| NaN|   false|             I|
|ACY|  Atlantic City Intl|39.457584|  -74.57716|  75| -5|  A|MAINLAND-EAST| NaN|   false|             I|
|AEX|     Alexandria Intl|  31.3274| -92.549835|  89| -6|  A|MAINLAND-EAST| NaN|   false|             I|
|AKC|   Akron Fulton Intl|  41.0375|  -81.46692|1067| -

In [28]:
df_airports.groupBy('administration').count().show()

+--------------+-----+
|administration|count|
+--------------+-----+
|             M|  180|
|             N|    5|
|             R|  276|
|           NaN|  772|
|             I|  164|
+--------------+-----+



# Dataset Planes

### Pergunta 1 

In [31]:
df_planes = df_planes.withColumn('tailchar',                                 
                                 F.regexp_replace(F.col("tailnum"),"N",""))


In [32]:
df_planes.select('tailnum','tailchar').filter(F.col('tailnum').contains('N')).show()

+-------+--------+
|tailnum|tailchar|
+-------+--------+
| N102UW|   102UW|
| N103US|   103US|
| N104UW|   104UW|
| N105UW|   105UW|
| N107US|   107US|
| N108UW|   108UW|
| N109UW|   109UW|
| N110UW|   110UW|
| N111US|   111US|
| N11206|   11206|
| N112US|   112US|
| N113UW|   113UW|
| N114UW|   114UW|
| N117UW|   117UW|
| N118US|   118US|
| N119US|   119US|
| N1200K|   1200K|
| N1201P|   1201P|
| N12114|   12114|
| N121DE|   121DE|
+-------+--------+
only showing top 20 rows



### Pergunta 2

In [33]:
# Usando Dataframe
df_planes.filter(F.col("year") == 0).count()

df_planes = df_planes.withColumn('year', (
    F.when(F.col('year') == 0, 1996)
     .otherwise(F.col('year'))
))


In [34]:
df_planes.filter(F.col("year") == 0).count()

0

### Pergunta 3

In [35]:
# Criando tabela ordenada por Manufacturer e Model
df_ordered = df_planes.groupBy("manufacturer",'model').min('year').orderBy('manufacturer','model')

# Renomeando colunas
df_ordered = df_ordered.withColumnRenamed('manufacturer', 'manufacturer1').withColumnRenamed('model', 'model1') \
                                                                          .withColumnRenamed('min(year)', 'year1')

# Criando tabela ordenada somente por manufacturer
df_ordered2 = df_planes.groupBy('manufacturer').min('year').orderBy('manufacturer')


# Renomeando colunas
df_ordered2 = df_ordered2.withColumnRenamed('manufacturer', 'manufacturer2') \
.withColumnRenamed('min(year)', 'year2')


df_final = df_planes.join(df_ordered, (df_planes.manufacturer == df_ordered.manufacturer1) &
(df_planes.model == df_ordered.model1),'left')

# Fazendo left join da tabela df_final com a df_ordered2

df_final = df_final.join(df_ordered2, df_final.manufacturer == df_ordered2.manufacturer2 , 'left')



# Modificando a coluna year para inputar os anos

df_final = df_final.withColumn('year',
F.when((F.col('year').isNull()) & (F.col('year1').isNull()) & (F.col('year2').isNull()), "NaN")
.when((F.col('year').isNull()) & (F.col('year1').isNull()), F.col('year2'))
.when(F.col('year').isNull(), F.col('year1'))
.otherwise(F.col('year'))
)
# Dropando as colunas criadas pelo join 
df_planes = df_final.drop('manufacturer1','manufacturer2','model1','year1','year2')

df_planes.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|tailchar|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|   102UW|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|   103US|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|   104UW|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|   105UW|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|   107US|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|   108UW|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|

### Pergunta 4

In [38]:
# coluna criada
df_planes = df_planes.withColumn('age',(
                F.when(
                   check_empty_column('year'), None
                ).otherwise(format_year_date(F.col("year")))
            ))

df_planes.show(vertical=True)

-RECORD 0----------------------------
 tailnum      | N102UW               
 year         | 1998                 
 type         | Fixed wing multi ... 
 manufacturer | AIRBUS INDUSTRIE     
 model        | A320-214             
 engines      | 2                    
 seats        | 182                  
 speed        | null                 
 engine       | Turbo-fan            
 tailchar     | 102UW                
 age          | 24                   
-RECORD 1----------------------------
 tailnum      | N103US               
 year         | 1999                 
 type         | Fixed wing multi ... 
 manufacturer | AIRBUS INDUSTRIE     
 model        | A320-214             
 engines      | 2                    
 seats        | 182                  
 speed        | null                 
 engine       | Turbo-fan            
 tailchar     | 103US                
 age          | 23                   
-RECORD 2----------------------------
 tailnum      | N104UW               
 year       

### Pergunta 5

In [39]:
# Usando data Frame

df_planes= df_planes.withColumn("type",(
        F.when(
            (F.col('type') =="Fixed wing multi engine"), 'MULTI_ENG')
         .when(
            (F.col('type') =="Fixed wing single engine"), 'SINGLE_ENG')
     .when(
            (F.col('type') =="Rotorcraft"), 'ROTORCRAFT')
))


df_planes.select(F.col("type")).distinct().show()

+----------+
|      type|
+----------+
|SINGLE_ENG|
| MULTI_ENG|
|ROTORCRAFT|
+----------+



### Pergunta 6

In [40]:
### Usando DataFrame
df_planes = df_planes.withColumn("manufacturer",(
    F.when(
        (F.col('manufacturer').contains('AIRBUS')),"AIRBUS")
     .when((F.col('manufacturer').contains('BOEING')),"BOEING")
     .when((F.col('manufacturer').contains('BOEING')),"BOEING")
     .when((F.col('manufacturer').contains('CESSNA')),"CESSNA")
     .when((F.col('manufacturer').contains('EMBRAER')),"EMBRAER")
     .when((F.col('manufacturer').contains('SIKORSKY')),"SIKORSKY")
     .when((F.col('manufacturer').contains('CANADAIR')),"CANADAIR")
     .when((F.col('manufacturer').contains('PIPER')),"PIPER")
     .when((F.col('manufacturer').contains('MCDONNELL DOUGLAS')),"MCDONNELL DOUGLAS")
     .when((F.col('manufacturer').contains('CIRRUS')),"CIRRUS")
     .when((F.col('manufacturer').contains('BELL')),"BELL")
     .when((F.col('manufacturer').contains('KILDALL GARY')),"KILDALL GARY")
     .when((F.col('manufacturer').contains('LAMBERT RICHARD')),"LAMBERT RICHARD")
     .when((F.col('manufacturer').contains('BARKER JACK')),"BARKER JACK")
     .when((F.col('manufacturer').contains('ROBINSON HELICOPTER')),"ROBINSON HELICOPTER")
     .when((F.col('manufacturer').contains('GULFSTREAM')),"GULFSTREAM")
     .when((F.col('manufacturer').contains('BOMBARDIER')),"BOMBARDIER")
     .when((F.col('manufacturer').contains('MARZ BARRY')),"MARZ BARRY")
     ))

In [41]:
df_planes.filter(F.col('manufacturer') == 'BARKER JACK L').show()

+-------+----+----+------------+-----+-------+-----+-----+------+--------+---+
|tailnum|year|type|manufacturer|model|engines|seats|speed|engine|tailchar|age|
+-------+----+----+------------+-----+-------+-----+-----+------+--------+---+
+-------+----+----+------------+-----+-------+-----+-----+------+--------+---+



### Pergunta 7

In [46]:
df_planes = df_planes.withColumn('model',(
          F.regexp_replace(F.col("model"),REGEX_PARENTESE,"")))

In [47]:
df_planes.filter((F.col('model').contains('(')) | (F.col('model').contains(')'))).show()
df_planes.filter(F.col('model').contains('DC-9-82')).show()
df_planes.filter(F.col("model").contains('(')).distinct().show()

+-------+----+----+------------+-----+-------+-----+-----+------+--------+---+
|tailnum|year|type|manufacturer|model|engines|seats|speed|engine|tailchar|age|
+-------+----+----+------------+-----+-------+-----+-----+------+--------+---+
+-------+----+----+------------+-----+-------+-----+-----+------+--------+---+

+-------+----+---------+-----------------+------------+-------+-----+-----+---------+--------+---+
|tailnum|year|     type|     manufacturer|       model|engines|seats|speed|   engine|tailchar|age|
+-------+----+---------+-----------------+------------+-------+-----+-----+---------+--------+---+
| N426AA|1986|MULTI_ENG|MCDONNELL DOUGLAS|DC-9-82MD-82|      2|  172| null|Turbo-fan|   426AA| 36|
| N456AA|1988|MULTI_ENG|MCDONNELL DOUGLAS|DC-9-82MD-82|      2|  172| null|Turbo-fan|   456AA| 34|
| N467AA|1988|MULTI_ENG|MCDONNELL DOUGLAS|DC-9-82MD-82|      2|  172| null|Turbo-fan|   467AA| 34|
| N468AA|1988|MULTI_ENG|MCDONNELL DOUGLAS|DC-9-82MD-82|      2|  172| null|Turbo-fan|   4

### Pergunta 8


In [48]:
df_planes = df_planes.withColumn('speed',(
                F.when(check_empty_column('speed'),  F.ceil(F.col('seats') / 0.36))
))

### Pergunta 9

In [49]:
df_planes = df_planes.withColumn("engine_type",(
     F.when(
         (F.col('engine') =="Turbo-fan"), 'FAN')
     .when(
         (F.col('engine') =="Turbo-jet"), 'JET')
     .when(
         (F.col('engine') =="Turbo-shaft"), 'SHAFT')
     .when(
         (F.col('engine') =="4 Cycle"), 'CYCLE')
))

# Dataset Flights 

### Pergunta 1 

In [57]:
df_flights =df_flights.withColumn('hour',(
                            F.when(check_empty_column('hour'), 0)
                                 .otherwise(F.col('hour'))))\
                      .withColumn('minute',(
                              F.when(check_empty_column('minute'), 0)
                                 .otherwise(F.col('minute'))))

In [58]:
df_flights.where(F.col('minute')==0).count()

226

### Pergunta 2

In [59]:
df_flights =df_flights.withColumn('hour',(
                            F.when(
                                F.col('hour')==24,0)
                                 .otherwise(F.col('hour'))
))

In [60]:
df_flights.where(F.col('hour')==24).count()

0

### Pergunta 3

In [61]:
# Incluindo coluna 
df_flights = df_flights.withColumn("dep_datetime", F.expr("make_timestamp(year, month, day, hour, minute, 00)"))

df_flights.show(vertical = True, truncate = False)

-RECORD 0---------------------------
 year         | 2014                
 month        | 12                  
 day          | 8                   
 dep_time     | 658                 
 dep_delay    | -7                  
 arr_time     | 935                 
 arr_delay    | -5                  
 carrier      | VX                  
 tailnum      | N846VA              
 flight       | 1780                
 origin       | SEA                 
 dest         | LAX                 
 air_time     | 132                 
 distance     | 954                 
 hour         | 6                   
 minute       | 58                  
 dep_datetime | 2014-12-08 06:58:00 
-RECORD 1---------------------------
 year         | 2014                
 month        | 1                   
 day          | 22                  
 dep_time     | 1040                
 dep_delay    | 5                   
 arr_time     | 1505                
 arr_delay    | 5                   
 carrier      | AS                  
 

### Pergunta 4 

In [63]:
df_flights = df_flights.withColumn('dep_time', F.when(check_empty_column('hour'), F.concat(F.col('hour'), F.col('minute')))
                                                .otherwise(F.col('dep_time'))   
                                  )

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select dep_time, hour, minute from flights").show(5)

+--------+----+------+
|dep_time|hour|minute|
+--------+----+------+
|     658|   6|    58|
|    1040|  10|    40|
|    1443|  14|    43|
|    1705|  17|     5|
|     754|   7|    54|
+--------+----+------+
only showing top 5 rows



### Pergunta 5

In [64]:
df_flights =df_flights.withColumn('dep_delay',(
                            F.when(check_empty_column('dep_delay'), 0)
                             .otherwise(F.col('dep_delay'))))

In [65]:
df_flights.where(F.col('dep_delay').isNull()).count()

0

### Pergunta 6

In [66]:
df_flights =df_flights.withColumn('arr_delay',(
                            F.when(check_empty_column('arr_delay'), 0)
                             .otherwise(F.col('arr_delay'))))

### Pergunta 7

In [67]:
df_flights = df_flights.drop('year', 'month', 'day', 'hour', 'minute')

df_flights.show()

+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|       dep_datetime|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+
|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|2014-12-08 06:58:00|
|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|2014-01-22 10:40:00|
|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|2014-03-09 14:43:00|
|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|2014-04-09 17:05:00|
|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|2014-03-09 07:54:00|
|    1037|        7|    1352|        2|     WN| N646SW|    48|   PDX| DEN|     121|     991|2014

### Pergunta 8

In [81]:
df_flights = df_flights.select("*",
                               (F.col('distance') * 0.1 + 20).cast('int').alias('air_time_projected'))

### Pergunta 9

In [69]:
flights_mean = df_flights.groupBy(F.col('dest').alias('dest1'), F.col('origin').alias('origin1'))\
    .agg(F.round(F.avg('air_time'), 0).cast('int').alias('air_time_expected')) #round ou ceil?

In [70]:
df_flights = df_flights.join(flights_mean,
             (df_flights.dest == flights_mean.dest1) & 
             (df_flights.origin == flights_mean.origin1),
             'left')

In [71]:
df_flights = df_flights.drop('dest1','origin1')

In [72]:
df_flights.filter(F.col('tailnum') == 'N650SW').show(2,vertical=True)

-RECORD 0--------------------------------
 dep_time          | 704                 
 dep_delay         | 4                   
 arr_time          | 1033                
 arr_delay         | -2                  
 carrier           | WN                  
 tailnum           | N650SW              
 flight            | 397                 
 origin            | PDX                 
 dest              | ABQ                 
 air_time          | 133                 
 distance          | 1111                
 dep_datetime      | 2014-03-20 07:04:00 
 air_time_expected | 136                 
-RECORD 1--------------------------------
 dep_time          | 2207                
 dep_delay         | 72                  
 arr_time          | 2329                
 arr_delay         | 64                  
 carrier           | WN                  
 tailnum           | N650SW              
 flight            | 32                  
 origin            | PDX                 
 dest              | SMF          

### Pergunta 10

In [82]:
df_flights.withColumn('max', F.greatest(F.col('air_time_projected'), F.col('air_time_expected'))).toPandas()

Unnamed: 0,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime,air_time_expected,haul_duration,dep_season,dep_delay_category,air_time_projected,max
0,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,132.0,954,2014-12-08 06:58:00,127,SHORT-HAUL,FALL,ANTECIPATED,115,127
1,1040,5,1505,5,AS,N559AS,851,SEA,HNL,360.0,2677,2014-01-22 10:40:00,344,LONG-HAUL,WINTER,MINOR,287,344
2,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,111.0,679,2014-03-09 14:43:00,101,SHORT-HAUL,WINTER,ANTECIPATED,87,101
3,1705,45,1839,34,WN,N360SW,344,PDX,SJC,83.0,569,2014-04-09 17:05:00,86,SHORT-HAUL,SPRING,MINOR,76,86
4,754,-1,1015,1,AS,N612AS,522,SEA,BUR,127.0,937,2014-03-09 07:54:00,122,SHORT-HAUL,WINTER,ANTECIPATED,113,122
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1806,-4,2104,-6,OO,N225AG,3458,SEA,SLC,89.0,689,2014-06-23 18:06:00,89,SHORT-HAUL,SUMER,ANTECIPATED,88,89
9996,2336,11,452,-13,AA,N3LEAA,1230,SEA,DFW,178.0,1660,2014-08-31 23:36:00,196,SHORT-HAUL,SUMER,MINOR,186,196
9997,904,-1,1042,-5,AS,N523AS,360,SEA,SMF,81.0,605,2014-08-08 09:04:00,83,SHORT-HAUL,SUMER,ANTECIPATED,80,83
9998,1441,26,1820,10,WN,N8647A,2857,SEA,ABQ,133.0,1180,2014-08-29 14:41:00,143,SHORT-HAUL,SUMER,MINOR,138,143


### Pergunta 11

In [83]:
# contando valores nulos!!!!
df_flights = df_flights.withColumn('air_time',(
                        F.when(check_empty_column('air_time'), F.col('dep_time') + F.col('air_time'))
                         .otherwise(F.col('air_time'))
))

### Pergunta 12

In [77]:
df_flights = df_flights.withColumn('haul_duration',(
                    F.when((df_flights.air_time >= 20) &
                           (df_flights.air_time < 180),
                           'SHORT-HAUL'
                          )
                     .when((df_flights.air_time >= 180) &
                           (df_flights.air_time < 360),
                           'MEDIUM-HAUL'
                          )
                     .when((df_flights.air_time >= 360), 
                           'LONG-HAUL'
                          )))

### Pergunta 13

In [79]:
df_flights = df_flights.withColumn('dep_season',
             F.when(F.col('dep_datetime').between('2013-12-21 21:48:00','2014-03-20 15:33:00'), 'WINTER')
              .when(F.col('dep_datetime').between('2014-03-20 15:33:00','2014-06-21 10:14:00'), 'SPRING')
              .when(F.col('dep_datetime').between('2014-06-21 10:14:00','2014-09-23 02:04:00'), 'SUMER')
              .when(F.col('dep_datetime').between('2014-09-23 02:04:00','2014-12-21 21:48:00'), 'FALL')
              .when(F.col('dep_datetime').between('2014-12-21 21:48:00','2015-03-20 15:33:00'), 'WINTER'))


df_flights.select('dep_season').groupBy('dep_season').count().show()

+----------+-----+
|dep_season|count|
+----------+-----+
|    WINTER| 2149|
|    SPRING| 2560|
|      FALL| 2373|
|     SUMER| 2918|
+----------+-----+



### Pergunta 14

In [80]:
df_flights = df_flights.withColumn('dep_delay_category', (
                      F.when((F.col('dep_delay') < 0), "ANTECIPATED")
                       .when((F.col('dep_delay') == 0), "INTIME")
                       .when((F.col('dep_delay') >= 60 ), "MAJOR")
                       .otherwise("MINOR")
))


In [56]:
### Usando SQL

In [None]:
### Usando RDD

### SALVANDO ARQUIVO  (DATAFRAME)

In [None]:
### SALVANDO ARQUIVO CSV
(df_airports.select(df_airports.colRegex("`^qa_.*`"))
            .repartition(1) # coalesce
            .write.format("CSV")
            .mode('overwrite')
            .option("header", "true")
            .save("Datasets/airports_qa.CSV"))


In [85]:
### SALVANDO ARQUIVO PARQUET
(df_airports.repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save("Datasets/transformacao/airports_proc.parquet"))
            

### SALVANDO ARQUIVO (RDD)

In [None]:
(rdd_airports.map(qa_faa)
             .map(qa_name)
             .map(qa_lat)
             .map(qa_lon)
             .map(qa_alt)
             .map(qa_tz)
             .map(qa_dst)
             .saveAsTextFile("Datasets/airports_.txt"))

In [73]:
pwd

'C:\\Academia_Pyspark\\exe_rev_douglas'