In [1]:
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from datetime import datetime

In [4]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master('local[2]')
                     .appName('Transformacao')
        )

In [5]:
schema_airports = StructType([
    StructField('faa',  StringType(),  True),
    StructField('name', StringType(),  True),
    StructField('lat',  FloatType(),   True),
    StructField('lon',  FloatType(),   True),
    StructField('alt',  IntegerType(), True),
    StructField('tz',   IntegerType(), True),
    StructField('dst',  StringType(),  True)
])

schema_planes = StructType([
    StructField('tailnum',      StringType(),  True),
    StructField('year',         IntegerType(), True),
    StructField('type',         StringType(),  True),
    StructField('manufacturer', StringType(),  True),
    StructField('model',        StringType(),  True),
    StructField('engines',      IntegerType(), True),
    StructField('seats',        IntegerType(), True),
    StructField('speed',        IntegerType(), True),
    StructField('engine',       StringType(),  True)
])

schema_flights = StructType([
    StructField('year',      IntegerType(), True),
    StructField('month',     IntegerType(), True),
    StructField('day',       IntegerType(), True),
    StructField('dep_time',  StringType(),  True),
    StructField('dep_delay', IntegerType(), True),
    StructField('arr_time',  StringType(),  True),
    StructField('arr_delay', IntegerType(), True),
    StructField('carrier',   StringType(),  True),
    StructField('tailnum',   StringType(),  True),
    StructField('flight',    StringType(),  True),
    StructField('origin',    StringType(),  True),
    StructField('dest',      StringType(),  True),
    StructField('air_time',  IntegerType(), True),
    StructField('distance',  IntegerType(), True),
    StructField('hour',      IntegerType(), True),
    StructField('minute',    IntegerType(), True),
])

In [6]:
df_airports = (spark.getOrCreate().read
                  .format('csv')
                  .option('header', 'true')
                  .schema(schema_airports)
                  .load('airports.csv'))

df_planes = (spark.getOrCreate().read
                  .format('csv')
                  .option('header', 'true')
                  .schema(schema_planes)
                  .load('planes.csv'))

df_flights = (spark.getOrCreate().read
                  .format('csv')
                  .option('header', 'true')
                  .schema(schema_flights)
                  .load('flights.csv'))

In [7]:
# Criacao das visões temporarias
df_airports.createOrReplaceTempView('airports')
df_planes.createOrReplaceTempView('planes')
df_flights.createOrReplaceTempView('flights')

In [8]:
df_airports.show()

+---+--------------------+---------+-----------+----+---+---+
|faa|                name|      lat|        lon| alt| tz|dst|
+---+--------------------+---------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  U|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|
|0W3|Harford County Ai...|39.566837|   -76.2024| 409| -5|  A|
|10C|  Galt Field Airport| 42.40289| -88.375114| 875| -6|  U|
|17G|Port Bucyrus-Craw...|40.781555|  -82.97481|1003| -5|  A|
|19A|Jac

# Pergunta 1

In [46]:
df = df_airports.withColumn('alt', F.when(F.col('alt') < 0 , 0).otherwise(F.col('alt')))

df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select distinct(alt) from airports where alt <= 0").show()

df.printSchema()

+---+
|alt|
+---+
|  0|
+---+

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



# Pergunta 2

In [10]:
df = df_airports.withColumn('dst', F.when(F.col('tz').between(-7,-5), 'A').otherwise(F.col('dst')))

df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select distinct(dst) from airports where tz between -7 and -5").show()

+---+
|dst|
+---+
|  A|
+---+



# Pergunta 3

In [11]:
df = df_airports.withColumn('dst', F.when(F.col('dst') == 'U', 'A').otherwise(F.col('dst')))

df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select distinct(dst) from airports where dst = 'U'").show()

+---+
|dst|
+---+
+---+



# Pergunta 4

In [48]:
df = df_airports.withColumn('region', (
                    F.when(F.col('lon') < -124, 'ALASKA')
                     .when(F.col('lon') > -50, 'OFFSHORE')
                     .when(F.col('lon').between(-124, -95), 'MAINLAND-WEST')
                     .when(F.col('lon').between(-95, -50), 'MAINLAND-EAST')
                     .otherwise(None)))       


In [50]:
df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select * from airports where region == 'ALASKA'").show()

+---+--------------------+---------+----------+----+---+---+------+
|faa|                name|      lat|       lon| alt| tz|dst|region|
+---+--------------------+---------+----------+----+---+---+------+
|1C9|Frazier Lake Airpark|54.013332|-124.76833| 152| -8|  A|ALASKA|
|369|  Atmautluak Airport| 60.86667|-162.27306|  18|-10|  A|ALASKA|
|6K8|Tok Junction Airport|63.329445|-142.95361|1639| -9|  A|ALASKA|
|6S2|            Florence| 43.98282|-124.11137|  51| -7|  A|ALASKA|
|ABL|      Ambler Airport| 67.10639| -157.8575| 334| -9|  A|ALASKA|
|ACV|              Arcata| 40.97811|-124.10861| 221| -8|  A|ALASKA|
|ADK|        Adak Airport|   51.878|  -176.646|  18|-10|  A|ALASKA|
|ADQ|              Kodiak|57.749966|-152.49385|  78| -9|  A|ALASKA|
|AET|   Allakaket Airport|  66.5519| -152.6222| 441| -8|  A|ALASKA|
|AFE|        Kake Airport|56.961388|-133.91028| 172| -9|  A|ALASKA|
|AGN|Angoon Seaplane Base|57.503613|  -134.585|   0| -9|  A|ALASKA|
|AIN|  Wainwright Airport|70.638054|-159.99472| 

# Pergunta 5

In [14]:
df = df_airports.withColumn('type', (
                    F.when(F.col('name').rlike('Airport|Tradeport|Heliport|Airport|Arpt'), 'AP')
                     .when(F.col('name').rlike('Aerodrome'), 'AD')
                     .when(F.col('name').rlike('airpark|Aero Park'), 'AK')
                     .when(F.col('name').rlike('Field|Fld'), 'FL')
                     .otherwise(None)))       


In [15]:
df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select * from airports where type = 'FL'").show()

+---+--------------------+---------+----------+----+---+---+----+
|faa|                name|      lat|       lon| alt| tz|dst|type|
+---+--------------------+---------+----------+----+---+---+----+
|4A7|Clayton County Ta...|  33.3891|  -84.3324| 874| -5|  A|  FL|
|4B8|     Robertson Field|41.689335| -72.86469| 202| -5|  A|  FL|
|7A4|        Foster Field|42.466446| -90.16939| 990| -6|  A|  FL|
|AGS|Augusta Rgnl At B...|33.369946|  -81.9645| 144| -5|  A|  FL|
|ASE|Aspen Pitkin Coun...|  39.2232|  -106.869|7820| -7|  A|  FL|
|BBX|         Wings Field|  40.1375|   -75.265| 302| -5|  A|  FL|
|BED|Laurence G Hansco...| 42.46995| -71.28903| 133| -5|  A|  FL|
|BFI|Boeing Fld King C...|    47.53|-122.30195|  21| -8|  A|  FL|
|BFL|         Meadows Fld|35.433598|-119.05677| 507| -8|  A|  FL|
|BGM|Greater Binghamto...| 42.20869|-75.979836|1636| -5|  A|  FL|
|BTR|Baton Rouge Metro...|30.533167|-91.149635|  70| -6|  A|  FL|
|BZN|      Gallatin Field| 45.77764|-111.16015|4500| -7|  A|  FL|
|C16|     

# Pergunta 6

In [16]:
MILITARY = ["Base", "Aaf", "AFs", "Ahp", "Afb", "LRRS", "Lrrs", "Arb", "Naf", "NAS", "Nas", "Jrb", "Ns", "As", "Cgas", "Angb"]

REGEX_MILITARY = r'|'.join(map(lambda x : f".*({x}).*", MILITARY))

df = df_airports.withColumn('military', (
                    F.when(F.col('name').rlike(REGEX_MILITARY), True)
                     .otherwise(False)))  

In [17]:
df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select military, count(*) from airports group by military").show()


+--------+--------+
|military|count(1)|
+--------+--------+
|    true|     166|
|   false|    1231|
+--------+--------+



# Pergunta 7

In [18]:
df = df_airports.withColumn('administration', (
                    F.when(F.col('name').rlike('International|Intl|Intercontinental'), 'I')
                     .when(F.col('name').rlike('National|Natl'), 'N')
                     .when(F.col('name').rlike('Regional|Reigonal|Rgnl|County|Metro|Metropolitan'), 'R')
                     .when(F.col('name').rlike('Municipal|Muni|City'), 'M')
                     .otherwise(None)))  

In [19]:
df.createOrReplaceTempView('airports')

spark.getOrCreate().sql("select name, administration from airports where name like '%International%'").show()

+--------------------+--------------+
|                name|administration|
+--------------------+--------------+
|Clow Internationa...|             I|
|Albuquerque Inter...|             I|
|Billings Logan In...|             I|
|Chippewa County I...|             I|
|William R Fairchi...|             I|
|Hector Internatio...|             I|
|Bishop International|             I|
|St Lucie County I...|             I|
|Wokal Field Glasg...|             I|
|Greenville-Sparta...|             I|
|Gary Chicago Inte...|             I|
|Huntsville Intern...|             I|
|Sawyer Internatio...|             I|
|International Air...|             I|
|Pease Internation...|             I|
|Chicago Rockford ...|             I|
|San Bernardino In...|             I|
|Louisville Intern...|             I|
+--------------------+--------------+



# PLANES DATASET

# Pergunta 1

In [20]:
df = df_planes.withColumn('tailchar', F.regexp_replace(F.col('tailnum'), '[0-9|N]+',""))

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select distinct(tailchar) from planes").show()

+--------+
|tailchar|
+--------+
|       K|
|      DZ|
|      PC|
|      UA|
|      PS|
|      AA|
|      AY|
|      MX|
|       F|
|       Q|
|      RR|
|      JC|
|      AT|
|      SW|
|      EV|
|      DQ|
|      UW|
|      VA|
|       E|
|      VJ|
+--------+
only showing top 20 rows



# Pergunta 2

In [52]:
df = df_planes.withColumn('year', F.when(F.col('year') == 0, 1996)
                                   .otherwise(F.col('year')) 
                         )

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select distinct(year) from planes").show(10)

+----+
|year|
+----+
|1959|
|1990|
|1975|
|2003|
|2007|
|2006|
|2013|
|null|
|1988|
|1997|
+----+
only showing top 10 rows



# Pergunta 3

In [22]:
#criando df auxiliar com o ano mais recente para cada manufacturer e model

df_aux1 = df_planes.select('manufacturer', 'model', 'year').groupBy('manufacturer', 'model').agg(F.min('year').alias('aux_year_1')).orderBy(F.col('manufacturer'), F.col('model'))

df_aux2 = df_planes.select('manufacturer', 'year').groupBy('manufacturer').agg(F.min('year').alias('aux_year_2')).orderBy(F.col('manufacturer'))

df_aux1 = df_aux1.withColumnRenamed('manufacturer', 'manufacturer_aux1').withColumnRenamed('model', 'model_aux1')

df_aux2 = df_aux2.withColumnRenamed('manufacturer', 'manufacturer_aux2')

condition = [df_planes.model == df_aux1.model_aux1, df_planes.manufacturer == df_aux1.manufacturer_aux1]

df_aux3 = df_planes.join(df_aux1, condition, 'left').withColumnRenamed('aux_year_1', 'aux_year_first_condition')

df_aux3 = df_aux3.drop('manufacturer_aux1', 'model_aux1')
  
condition1 = [df_aux3.manufacturer == df_aux2.manufacturer_aux2]

df_aux4 = df_aux3.join(df_aux2, condition1, 'left').withColumnRenamed('aux_year_2', 'aux_year_second_condition')

df_aux4 = df_aux4.drop('manufacturer_aux2') 



In [23]:
df = df_aux4.withColumn('year', F.when(F.col('year').isNull(), F.col('aux_year_first_condition'))
                                 .otherwise(F.col('aux_year_first_condition'))
                       )

df = df_aux4.withColumn('year', F.when(F.col('aux_year_first_condition').isNull(), F.col('aux_year_second_condition'))
                                 .otherwise(F.col('aux_year_first_condition'))
                       )
                               
df = df.select('year', 'manufacturer', 'model', 'aux_year_first_condition', 'aux_year_second_condition')

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select * from planes where year is Null").show()


+----+---------------+-------------+------------------------+-------------------------+
|year|   manufacturer|        model|aux_year_first_condition|aux_year_second_condition|
+----+---------------+-------------+------------------------+-------------------------+
|null|LAMBERT RICHARD|    FALCON XP|                    null|                     null|
|null|  BARKER JACK L|ZODIAC 601HDS|                    null|                     null|
+----+---------------+-------------+------------------------+-------------------------+



# Pergunta 4

In [53]:
df = df_planes.withColumn('age', (F.year(F.current_date()))-F.col('year'))

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select distinct(age) from planes").show(5)

df.printSchema()

+---+
|age|
+---+
| 31|
| 34|
| 28|
| 27|
| 26|
+---+
only showing top 5 rows

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: integer (nullable = true)
 |-- engine: string (nullable = true)
 |-- age: integer (nullable = true)



# Pergunta 5

In [25]:
df = df_planes.withColumn('type', F.when(F.col('type') == 'Fixed wing multi engine', 'MULTI_ENG')
                                   .when(F.col('type') == 'Fixed wing single engine', 'SINGLE_ENG')
                                   .when(F.col('type') == 'Rotorcraft', 'ROTORCRAFT')
                         )

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select distinct(type) from planes").show()

+----------+
|      type|
+----------+
|SINGLE_ENG|
| MULTI_ENG|
|ROTORCRAFT|
+----------+



# Pergunta 6

In [26]:
df = df_planes.withColumn('manufacturer', F.when(F.col('manufacturer') == 'AIRBUS INDUSTRIE', 'AIRBUS')
                                           .when(F.col('manufacturer') == 'BOMBARDIER INC', 'BOMBARDIER') 
                                           .when(F.col('manufacturer').rlike('MCDONNELL DOUGLAS...'), 'MCDONNELL DOUGLAS')
                                           .when(F.col('manufacturer') == 'CIRRUS DESIGN CORP', 'CIRRUS')
                                           .when(F.col('manufacturer') == 'BARKER JACK L', 'BARKER JACK')
                                           .when(F.col('manufacturer').rlike('ROBINSON HELICOPT...'), 'ROBINSON HELICOPTER')
                                           .when(F.col('manufacturer') == 'GULFSTREAM AEROSPACE', 'GULFSTREAM')
                                           .otherwise(F.col('manufacturer'))
                         )

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select distinct(manufacturer) from planes order by manufacturer").show()

+-------------------+
|       manufacturer|
+-------------------+
|             AIRBUS|
|        BARKER JACK|
|               BELL|
|             BOEING|
|         BOMBARDIER|
|           CANADAIR|
|             CESSNA|
|             CIRRUS|
|            EMBRAER|
|         GULFSTREAM|
|       KILDALL GARY|
|    LAMBERT RICHARD|
|         MARZ BARRY|
|  MCDONNELL DOUGLAS|
|              PIPER|
|ROBINSON HELICOPTER|
|           SIKORSKY|
+-------------------+



# Pergunta 7

In [27]:
df = df_planes.withColumn('model1', F.trim(F.regexp_replace(F.col('model'), '\(([^)]+)\)', "")))

df_planes.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select distinct(model) from planes where model like '%(%'").show()
                                                                
df.createOrReplaceTempView('planes')
    
spark.getOrCreate().sql("select model, model1 from planes where model like '%210-5%'").show()


+--------------+
|         model|
+--------------+
|  GV-SP (G550)|
|DC-9-82(MD-82)|
|    210-5(205)|
|DC-9-83(MD-83)|
+--------------+

+----------+------+
|     model|model1|
+----------+------+
|210-5(205)| 210-5|
+----------+------+



# Pergunta 8

In [28]:
df = df_planes.withColumn('speed', F.when(F.col('speed').isNull(), F.col('seats')/0.36)
                                    .otherwise(F.col('speed')) 
                         )

df = df.withColumn('speed', F.ceil(F.col('speed')))

df_planes.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select * from planes where speed is Null").show(5)

df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select * from planes where tailnum = 'N102UW'").show()



+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 5 rows

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   

# Pergunta 9

In [29]:
df = df_planes.withColumn('engine_type', F.when(F.col('engine').rlike('Turbo-'), F.upper(F.regexp_replace(F.col('engine'), 'Turbo-', "")))
                                          .when(F.col('engine') == '4 Cycle', 'CYCLE') 
                                          .otherwise(F.col('engine'))
                         )
              
df.createOrReplaceTempView('planes')

spark.getOrCreate().sql("select engine, engine_type from planes group by 1, 2").show()



+-------------+-------------+
|       engine|  engine_type|
+-------------+-------------+
|  Turbo-shaft|        SHAFT|
|Reciprocating|Reciprocating|
|    Turbo-fan|          FAN|
|      4 Cycle|        CYCLE|
|   Turbo-prop|         PROP|
|    Turbo-jet|          JET|
+-------------+-------------+



# FLIGHTS DATASET

# Pergunta 1

In [30]:
df_flights = df_flights.withColumn('hour', F.when(F.col('hour').isNull(), 0)
                                    .otherwise(F.col('hour'))
                          ) \
               .withColumn('minute', F.when(F.col('minute').isNull(), 0)
                                      .otherwise(F.col('minute')))
                                 

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select hour, minute from flights where hour is Null or minute is Null").show(5)



+----+------+
|hour|minute|
+----+------+
+----+------+



# Pergunta 2

In [31]:
df_flights = df_flights.withColumn('hour', F.when(F.col('hour') == 24 , 0)
                                    .otherwise(F.col('hour'))  
                          )


df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select hour from flights where hour = 24").show()

+----+
|hour|
+----+
+----+



# Pergunta 3

In [32]:
df_flights = df_flights.withColumn('dep_datetime', 
                    F.to_timestamp( F.concat_ws(' ', F.concat_ws('-', df_flights.year, df_flights.month, df_flights.day),
                                                     F.concat_ws(':', df_flights.hour, df_flights.minute, F.lit(00))       
                                               )
                                  )
)

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select dep_datetime, year, month, day, hour, minute from flights").show(5)

df_flights.printSchema()

+-------------------+----+-----+---+----+------+
|       dep_datetime|year|month|day|hour|minute|
+-------------------+----+-----+---+----+------+
|2014-12-08 06:58:00|2014|   12|  8|   6|    58|
|2014-01-22 10:40:00|2014|    1| 22|  10|    40|
|2014-03-09 14:43:00|2014|    3|  9|  14|    43|
|2014-04-09 17:05:00|2014|    4|  9|  17|     5|
|2014-03-09 07:54:00|2014|    3|  9|   7|    54|
+-------------------+----+-----+---+----+------+
only showing top 5 rows

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer 

# Pergunta 4

In [33]:
df_flights = df_flights.withColumn('dep_time', F.when(F.col('dep_time') == 'NA', F.concat(F.col('hour'), F.col('minute')))
                                                .otherwise(F.col('dep_time'))   
                                  )

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select dep_time, hour, minute from flights").show(5)

+--------+----+------+
|dep_time|hour|minute|
+--------+----+------+
|     658|   6|    58|
|    1040|  10|    40|
|    1443|  14|    43|
|    1705|  17|     5|
|     754|   7|    54|
+--------+----+------+
only showing top 5 rows



# Pergunta 5

In [54]:
df_flights = df_flights.withColumn('dep_delay', F.when(F.col('dep_delay').isNull(), 0)
                                         .otherwise(F.col('dep_delay')))

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select * from flights where dep_delay = 'NA' or dep_delay is Null").show()


+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------+------------------+-----------------+-------------+----------+------------------+
|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|dep_datetime|air_time_projected|air_time_expected|haul_duration|dep_season|dep_delay_category|
+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------+------------------+-----------------+-------------+----------+------------------+
+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------+------------------+-----------------+-------------+----------+------------------+



# Pergunta 6

In [35]:
df_flights = df_flights.withColumn('arr_delay', F.when(F.col('arr_delay').isNull(), 0)
                                         .otherwise(F.col('arr_delay')))

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select * from flights where arr_delay is Null").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|dep_datetime|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+



# Pergunta 7

In [36]:
df_flights = df_flights.drop('year', 'month', 'hour', 'minute')

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select * from flights where arr_delay is Null").show()

+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------+
|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|dep_datetime|
+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------+
+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------+



# Pergunta 8

In [37]:
df_flights = df_flights.withColumn('air_time_projected', F.col('distance')*0.1+20)

df_flights = df_flights.withColumn('air_time_projected', F.col('air_time_projected').cast(IntegerType()))

df_flights.printSchema()

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select distance, air_time_projected from flights").show(5)


root
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- dep_datetime: timestamp (nullable = true)
 |-- air_time_projected: integer (nullable = true)

+--------+------------------+
|distance|air_time_projected|
+--------+------------------+
|     954|               115|
|    2677|               287|
|     679|                87|
|     569|                76|
|     937|               113|
+--------+------------------+
only showing top 5 rows



# Pergunta 9

In [38]:
dfaux = df_flights.groupBy('origin', 'dest').avg('air_time').withColumnRenamed('avg(air_time)','air_time_expected').withColumnRenamed('origin', 'origin1').withColumnRenamed('dest', 'dest1')

dfaux.show(3)

+-------+-----+------------------+
|origin1|dest1| air_time_expected|
+-------+-----+------------------+
|    SEA|  RNO|            74.375|
|    SEA|  DTW|219.81632653061226|
|    SEA|  CLE|             233.5|
+-------+-----+------------------+
only showing top 3 rows



In [39]:
condition = [df_flights.origin == dfaux.origin1, df_flights.dest == dfaux.dest1]

df_flights = df_flights.join(dfaux, condition, 'left')

df_flights = df_flights.drop('origin1', 'dest1')

df_flights = df_flights.withColumn('air_time_expected', F.col('air_time_expected').cast(IntegerType()))

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select origin, dest, air_time_expected from flights").show(5)

df_flights.printSchema()



+------+----+-----------------+
|origin|dest|air_time_expected|
+------+----+-----------------+
|   SEA| LAX|              126|
|   SEA| HNL|              343|
|   SEA| SFO|              101|
|   PDX| SJC|               85|
|   SEA| BUR|              122|
+------+----+-----------------+
only showing top 5 rows

root
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- dep_datetime: timestamp (nullable = true)
 |-- air_time_projected: integer (nullable = true)
 |-- air_time_expected: integer (nullable = true)



# Pergunta 10

In [40]:
df_flights = df_flights.withColumn('air_time', F.when(F.col('air_time_projected')>=F.col('air_time_expected'), F.col('air_time_projected'))
                                                .when(F.col('air_time_projected')<F.col('air_time_expected'), F.col('air_time_expected'))
                                                .otherwise(F.col('air_time'))
                                  
                                  )

#Both the functions greatest() and least() helps in identifying the greater and smaller value among few of the columns.

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql('select air_time, air_time_projected, air_time_expected from flights').show(5)


+--------+------------------+-----------------+
|air_time|air_time_projected|air_time_expected|
+--------+------------------+-----------------+
|     126|               115|              126|
|     343|               287|              343|
|     101|                87|              101|
|      85|                76|               85|
|     122|               113|              122|
+--------+------------------+-----------------+
only showing top 5 rows



# Pergunta 11

In [56]:
df_flights = df_flights.withColumn('arr_time', F.when(F.col('arr_time') == 'NA', F.col('dep_time')+F.col('air_time'))
                                                .otherwise(F.col('arr_time'))  
                                  )


df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select arr_time, dep_time, air_time from flights where air_time + dep_time = arr_time").show()

+--------+--------+--------+
|arr_time|dep_time|air_time|
+--------+--------+--------+
|   127.0|      00|     127|
|   268.0|      00|     268|
|   214.0|      00|     214|
|   114.0|      00|     114|
|   279.0|      00|     279|
|   339.0|      00|     339|
|    34.0|      00|      34|
|  2291.0|    2238|      53|
|   277.0|      00|     277|
|   114.0|      00|     114|
|    85.0|      00|      85|
|    81.0|      00|      81|
|   213.0|      00|     213|
|   127.0|      00|     127|
|   141.0|      00|     141|
|    1930|    1832|      98|
|    30.0|      00|      30|
|    32.0|      00|      32|
|    30.0|      00|      30|
|   202.0|      00|     202|
+--------+--------+--------+
only showing top 20 rows



# Pergunta 12 

In [42]:
df_flights = df_flights.withColumn('haul_duration', F.when(F.col('air_time').between(20, 180), 'SHORT-HAUL')
                                                     .when(F.col('air_time').between(180, 300), 'MEDIUM-HAUL')
                                                     .when(F.col('air_time')>300 , 'LONG-HAUL') 
                                  )

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select haul_duration, air_time from flights").show()

+-------------+--------+
|haul_duration|air_time|
+-------------+--------+
|   SHORT-HAUL|     126|
|    LONG-HAUL|     343|
|   SHORT-HAUL|     101|
|   SHORT-HAUL|      85|
|   SHORT-HAUL|     122|
|   SHORT-HAUL|     123|
|   SHORT-HAUL|      81|
|   SHORT-HAUL|     101|
|   SHORT-HAUL|     137|
|  MEDIUM-HAUL|     202|
|   SHORT-HAUL|     126|
|   SHORT-HAUL|     141|
|   SHORT-HAUL|     118|
|  MEDIUM-HAUL|     190|
|   SHORT-HAUL|     101|
|   SHORT-HAUL|      85|
|   SHORT-HAUL|      82|
|  MEDIUM-HAUL|     214|
|  MEDIUM-HAUL|     286|
|   SHORT-HAUL|     108|
+-------------+--------+
only showing top 20 rows



# Pergunta 13

In [43]:
df_flights = df_flights.withColumn('dep_season',
                                        F.when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-03-20 15:33:00')),F.concat(F.year('dep_datetime'),F.lit('-06-21 10:14:00'))), 'SPRING')
                                         .when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-06-21 10:14:00')),F.concat(F.year('dep_datetime'),F.lit('-09-23 02:04:00'))), 'SUMMER')
                                         .when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-09-23 02:04:00')),F.concat(F.year('dep_datetime'),F.lit('-12-21 21:48:00'))), 'FALL')
                                         .otherwise('WINTER')
                                  )


df_flights.select('dep_season').groupBy('dep_season').count().show()
df_flights.select('dep_datetime','dep_season').filter(df_flights.dep_season == 'SPRING').show()

df_flights.createOrReplaceTempView('flights')

spark.getOrCreate().sql("select dep_datetime, dep_season from flights where dep_season = 'WINTER'").show()

+----------+-----+
|dep_season|count|
+----------+-----+
|    WINTER| 2149|
|    SPRING| 2560|
|      FALL| 2373|
|    SUMMER| 2918|
+----------+-----+

+-------------------+----------+
|       dep_datetime|dep_season|
+-------------------+----------+
|2014-04-09 17:05:00|    SPRING|
|2014-05-12 16:55:00|    SPRING|
|2014-04-19 12:36:00|    SPRING|
|2014-06-05 17:33:00|    SPRING|
|2014-06-05 11:33:00|    SPRING|
|2014-06-04 11:15:00|    SPRING|
|2014-06-11 19:57:00|    SPRING|
|2014-06-07 18:23:00|    SPRING|
|2014-04-30 08:01:00|    SPRING|
|2014-06-02 22:22:00|    SPRING|
|2014-05-21 05:15:00|    SPRING|
|2014-06-11 07:50:00|    SPRING|
|2014-06-13 22:33:00|    SPRING|
|2014-05-02 12:53:00|    SPRING|
|2014-05-22 10:18:00|    SPRING|
|2014-05-16 07:46:00|    SPRING|
|2014-05-12 14:24:00|    SPRING|
|2014-04-06 18:44:00|    SPRING|
|2014-04-01 10:10:00|    SPRING|
|2014-04-25 10:49:00|    SPRING|
+-------------------+----------+
only showing top 20 rows

+-------------------+--------

# Pergunta 14

In [44]:
df_flights = df_flights.withColumn('dep_delay_category', F.when(F.col('dep_delay')<0, 'ANTECIPATED')
                                                          .when(F.col('dep_delay')==0, 'INTIME')
                                                          .when(F.col('dep_delay').between(0,60), 'MINOR')
                                                          .when(F.col('dep_delay')>60, 'MAJOR')
                                  )


df_flights.select('dep_delay', 'dep_delay_category').show()

+---------+------------------+
|dep_delay|dep_delay_category|
+---------+------------------+
|       -7|       ANTECIPATED|
|        5|             MINOR|
|       -2|       ANTECIPATED|
|       45|             MINOR|
|       -1|       ANTECIPATED|
|        7|             MINOR|
|       42|             MINOR|
|       -5|       ANTECIPATED|
|       -4|       ANTECIPATED|
|       -3|       ANTECIPATED|
|       -2|       ANTECIPATED|
|        0|            INTIME|
|       21|             MINOR|
|       -4|       ANTECIPATED|
|       89|             MAJOR|
|        3|             MINOR|
|       50|             MINOR|
|       -3|       ANTECIPATED|
|       -9|       ANTECIPATED|
|      -12|       ANTECIPATED|
+---------+------------------+
only showing top 20 rows

