In [1]:
import findspark
findspark.init()

In [2]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, BinaryType, DateType


In [3]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("Aceleração PySpark - Capgemini"))

In [4]:
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   FloatType(), True),
    StructField("dst",  StringType(),  True),
    StructField("qa_faa",  StringType(),  True),
    StructField("qa_name",  StringType(),  True),
    StructField("qa_lat",  StringType(),  True),
    StructField("qa_lon",  StringType(),  True),
    StructField("qa_alt",  StringType(),  True),
    StructField("qa_tz",  StringType(),  True),
    StructField("qa_dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True),
    StructField("qa_tailnum",  StringType(),  True),
    StructField("qa_year",  StringType(),  True),
    StructField("qa_type",  StringType(),  True),
    StructField("qa_manufacturer",  StringType(),  True),
    StructField("qa_model",  StringType(),  True),
    StructField("qa_engines",  StringType(),  True),
    StructField("qa_seats",  StringType(),  True),
    StructField("qa_speed",  StringType(),  True),
    StructField("qa_enginge",  StringType(),  True)  
])

schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
    StructField("qa_year_month_day",    StringType(), True),
    StructField("qa_hour_minute",    StringType(), True),
    StructField("qa_dep_arr_time",    StringType(), True),
    StructField("qa_dep_arr_delay",    StringType(), True),
    StructField("qa_carrier",    StringType(), True),
    StructField("qa_tailnum",    StringType(), True),
    StructField("qa_flight",    StringType(), True),
    StructField("qa_origin_dest",    StringType(), True),
    StructField("qa_air_time",    StringType(), True),
    StructField("qa_distance",    StringType(), True),
    StructField("qa_distance_airtime",    StringType(), True)
])

In [4]:
airports_path_data = '../data/qa_airports.parquet'
planes_path_data   = '../data/qa_planes.parquet'
flights_path_data  = '../data/qa_flights.parquet'


airports = (spark.getOrCreate().read
                  .format("parquet")
                  .option("header", "true")
#                  .schema(schema_airports)
                  .load(airports_path_data))

planes   = (spark.getOrCreate().read
                  .format("parquet")
                  .option("header", "true")
                  .load(planes_path_data))

flights  = (spark.getOrCreate().read
                  .format("parquet")
                  .option("header", "true")
#                  .schema(schema_flights)
                  .load(flights_path_data))

In [5]:
airports.createOrReplaceTempView('airports')
planes.createOrReplaceTempView('planes')
flights.createOrReplaceTempView('flights')

In [6]:
planes = planes.withColumn('year',(
                    planes.year.cast(IntegerType())
)).withColumn('engines',(
                    planes.engines.cast(IntegerType())
)).withColumn('seats',(
                    planes.seats.cast(IntegerType())
)).withColumn('speed',(
                    planes.speed.cast(IntegerType())
))

In [7]:
planes.show(1)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|  enginge|qa_tailnum|qa_year|qa_type|qa_manufacturer|qa_model|qa_engines|qa_seats|qa_speed|qa_enginge|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|              C|    null|      null|    null|       M|      null|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+
only showing top 1 row



In [8]:
def higher_than(col, condition):
    return (F.col(col) > condition )

def between(col, lower_condition, higher_condition):
    return ( F.col(col).between(lower_condition, higher_condition) )

def lower_than(col, condition):
    return (F.col(col) < condition )

def equal(col, condition):
    return (F.col(col) == condition )

## Airports Dataset 

In [9]:
airports_path_data = '../data/airports.csv'


#Schema
airports_schema = (StructType([ 
    StructField("faa",StringType(),True), 
    StructField("name",StringType(),True), 
    StructField("lat",FloatType(),True), 
    StructField("lon", FloatType(), True), 
    StructField("alt", IntegerType(), True), 
    StructField("tz", FloatType(), True),
    StructField("dst", StringType(), True )
                    ])
         )

#DataFrame
airports = spark.getOrCreate().read.csv(airports_path_data, header = True, schema = airports_schema)


In [62]:
airports.show(5)

+---+--------------------+---------+---------+----+----+---+
|faa|                name|      lat|      lon| alt|  tz|dst|
+---+--------------------+---------+---------+----+----+---+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044|-5.0|  A|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264|-5.0|  A|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801|-6.0|  A|
|06N|     Randall Airport| 41.43191|-74.39156| 523|-5.0|  A|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11|-4.0|  A|
+---+--------------------+---------+---------+----+----+---+
only showing top 5 rows



In [63]:
airports.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: float (nullable = true)
 |-- dst: string (nullable = true)



### Pergunta 1

In [10]:
airports = airports.withColumn('alt', (
                 F.when(lower_than('alt', 0), 0 )
                 .otherwise(F.col('alt'))
))

In [11]:
airports.where(airports.alt < 0).show()

+---+----+---+---+---+---+---+
|faa|name|lat|lon|alt| tz|dst|
+---+----+---+---+---+---+---+
+---+----+---+---+---+---+---+



### Pergunta 2

In [12]:
airports = airports.withColumn('dst', (
                    F.when( (airports.tz >= -7) &
                            (airports.tz <= -5)
                            ,'A')
                    .otherwise(F.col('dst'))
))

In [13]:
#airports.where((airports.tz == -5)).show(5)

### Pergunta 3

In [14]:
airports = airports.withColumn('dst',(
                    F.when(equal('dst', 'U'), 'A')
                    .otherwise(F.col('dst'))
))

airports.where(airports.dst == 'U').show()

+---+----+---+---+---+---+---+
|faa|name|lat|lon|alt| tz|dst|
+---+----+---+---+---+---+---+
+---+----+---+---+---+---+---+



### Pergunta 4

In [15]:
airports = airports.withColumn('region', (
                    F.when(lower_than('lon', -124), 'ALASKA')
                     .when(higher_than('lon', -50) | lower_than('lat',  24), 'OFFSHORE')
                     .when(lower_than('lon', -95) | equal('lon', -95), 'MAINLAND-WEST')
                     .when(higher_than('lon', -95), 'MAINLAND-EAST')
                     .otherwise(None)
))

airports.where((airports.lon.between(-124, -50)) &
               (airports.region != 'MAINLAND-WEST') &
               (airports.region != 'MAINLAND-EAST')).show()

+---+----+---+---+---+---+---+------+
|faa|name|lat|lon|alt| tz|dst|region|
+---+----+---+---+---+---+---+------+
+---+----+---+---+---+---+---+------+



### Pergunta 5

In [16]:
airports = airports.withColumn('type',(
                    F.when(F.col('name').rlike('Airport|Tradeport|Heliport|Airpor|Arpt' ), 'AP')
                     .when(F.col('name').rlike('Aerodrome'), 'AD')
                     .when(F.col('name').rlike('Airpark|Aero Park' ), 'AK')
                     .when(F.col('name').rlike('Station|Air Station'), 'AS')
                     .when(F.col('name').rlike('Field|Fld'), 'FL')
                     .otherwise(None)
))

airports.where((F.col('type') != 'AP') &
              (F.col('type') != 'AD') &
              (F.col('type') != 'AK') &
              (F.col('type') != 'AS') &
              (F.col('type') != 'FL') 
             ).show()

+---+----+---+---+---+---+---+------+----+
|faa|name|lat|lon|alt| tz|dst|region|type|
+---+----+---+---+---+---+---+------+----+
+---+----+---+---+---+---+---+------+----+



### Pergunta 6

In [17]:
#string_military = "Base", "Aaf", "AFs", "Ahp", "Afb", "LRRS", "Lrrs", "Arb", "Naf", "NAS", "Nas", "Jrb", "Ns", "As", "Cgas", "Angb"
string_military2 = ["Base", "Aaf", "AFs", "Ahp", "Afb", "LRRS", "Lrrs", "Arb", "Naf", "NAS", "Nas", "Jrb", "Ns",
"As", "Cgas", "Angb"]

#regex_string_military = ''
#regex_string_military = r'|'.join(map(lambda x : f".*(^| ){x}($| ).*", string_military))

regex_string_military2 = ''
regex_string_military2 = r'|'.join(map(lambda x : f".*(^| ){x}($| ).*", string_military2))


airports = airports.withColumn('military',(
                    F.when(F.col('name').rlike(regex_string_military2), True)
                    .otherwise(False)
))

#airports.filter(airports.military == False).show(5, truncate =False)

print(airports.where(F.col('military') == True).count())
print(airports.where(F.col('military') == False).count())

160
1237


### Pergunta 7

In [18]:
airports = airports.withColumn('administration',(
                    F.when(F.col('name').rlike('International|Intl|Intercontinental'), 'I')
                    .when(F.col('name').rlike('National|Natl'), 'N')
                    .when(F.col('name').rlike('Regional|Reigonal|Rgnl|County|Metro|Metropolitan'), 'R')
                    .when(F.col('name').rlike('Municipal|Muni|City'), 'I')
))

airports.where(airports.administration.isNull()).show(10)

+---+--------------------+---------+----------+----+----+---+-------------+----+--------+--------------+
|faa|                name|      lat|       lon| alt|  tz|dst|       region|type|military|administration|
+---+--------------------+---------+----------+----+----+---+-------------+----+--------+--------------+
|04G|   Lansdowne Airport|41.130474| -80.61958|1044|-5.0|  A|MAINLAND-EAST|  AP|   false|          null|
|06N|     Randall Airport| 41.43191| -74.39156| 523|-5.0|  A|MAINLAND-EAST|  AP|   false|          null|
|09J|Jekyll Island Air...|31.074472| -81.42778|  11|-4.0|  A|MAINLAND-EAST|  AP|   false|          null|
|0P2|Shoestring Aviati...|39.794823|-76.647194|1000|-5.0|  A|MAINLAND-EAST|null|   false|          null|
|10C|  Galt Field Airport| 42.40289|-88.375114| 875|-6.0|  A|MAINLAND-EAST|  AP|   false|          null|
|1A3|Martin Campbell F...|35.015804| -84.34683|1789|-4.0|  A|MAINLAND-EAST|  AP|   false|          null|
|1C9|Frazier Lake Airpark|54.013332|-124.76833| 152|-8.

In [19]:
airports.write.mode('overwrite').parquet('../data/datasets_transformados/airports.parquet')

## Planes Dataset

### Pergunta 1

In [10]:
planes = planes.withColumn('tailchar',(
                 F.regexp_replace(F.substring(F.col('tailnum'), 2 , 6), '[0-9]', '')
))

### Pergunta 2

In [11]:
planes = planes.withColumn('year',(
                 F.when(F.col('year') == 0, 1996 )
                 .otherwise(F.col('year'))
))

### Pergunta 3

In [12]:
planes.sort('manufacturer', 'model', 'year')
print((planes.count(), len(planes.columns)))


(2628, 19)


In [13]:
#planes = planes.withColumn('manufacturer_model_year',(
#                 F.concat_ws('-', planes.manufacturer, planes.model, planes.year)
#))

In [14]:
aux_planes = planes.select('manufacturer', 'model', 'year').where(planes.year.isNotNull())
aux_planes = (aux_planes.withColumnRenamed('manufacturer', 'aux_manufacturer')
                        .withColumnRenamed('model', 'aux_model')
                        .withColumnRenamed('year', 'aux_year'))

aux_planes2 = planes.select('manufacturer', 'year').where(planes.year.isNotNull())
aux_planes2 = (aux_planes2.withColumnRenamed('manufacturer', 'aux_manufacturer')
                        .withColumnRenamed('year', 'aux_year'))

In [15]:
aux_planes = (aux_planes.groupBy('aux_manufacturer', 'aux_model')
                        .agg({'aux_year':'min',})
                        .withColumnRenamed('min(aux_year)', 'aux_year'))

aux_planes2 = (aux_planes2.groupBy('aux_manufacturer')
                          .agg({'aux_year':'min',})
                          .withColumnRenamed('min(aux_year)', 'aux_year'))

In [16]:
print((joined_planes.count(), len(joined_planes.columns)))

NameError: name 'joined_planes' is not defined

In [17]:
joined_planes = planes.join(aux_planes, 
            (planes.manufacturer == aux_planes.aux_manufacturer) &
            (planes.model == aux_planes.aux_model),
            'left'
)

joined_planes = joined_planes.withColumn('year',(
                        F.when((joined_planes.year.isNull()) &
                               (joined_planes.aux_year.isNotNull()), joined_planes.aux_year
                              )
                         .otherwise(joined_planes.year)
))

joined_planes = joined_planes.drop('aux_manufacturer', 'aux_model', 'aux_year')



In [18]:
joined_planes = joined_planes.join(aux_planes2, 
            (joined_planes.manufacturer == aux_planes2.aux_manufacturer),
            'left'
)

joined_planes = joined_planes.withColumn('year',(
                        F.when((joined_planes.year.isNull()) &
                               (joined_planes.aux_year.isNotNull()), joined_planes.aux_year
                              )
                         .otherwise(joined_planes.year)
))

planes = joined_planes

### Pergunta 4

In [19]:
planes = planes.withColumn('age',(
                 F.when(planes.year.isNotNull(), 2022 - planes.year )
                 .otherwise(None)
                 ))

### Pergunta 5

In [20]:
planes = planes.withColumn('type',(
                 F.when(planes.type == 'Fixed wing multi engine', 'MULTI_ENG')
                 .when(planes.type == 'Fixed Wing single engine', 'SINGLE_ENG')
                 .when(planes.type == 'Rotorcraft', 'ROTORCRAFT')
))

### Pergunta 6

In [21]:
planes.select('manufacturer').distinct().sort('manufacturer').show(truncate = False)

+-----------------------------+
|manufacturer                 |
+-----------------------------+
|AIRBUS                       |
|AIRBUS INDUSTRIE             |
|BARKER JACK L                |
|BELL                         |
|BOEING                       |
|BOMBARDIER INC               |
|CANADAIR                     |
|CESSNA                       |
|CIRRUS DESIGN CORP           |
|EMBRAER                      |
|GULFSTREAM AEROSPACE         |
|KILDALL GARY                 |
|LAMBERT RICHARD              |
|MARZ BARRY                   |
|MCDONNELL DOUGLAS            |
|MCDONNELL DOUGLAS AIRCRAFT CO|
|PIPER                        |
|ROBINSON HELICOPTER CO       |
|SIKORSKY                     |
+-----------------------------+



In [22]:
planes = planes.withColumn('manufacturer',(
                 F.when(planes.manufacturer.startswith('AIRBUS'), 'AIRBUS')
                  .when(planes.manufacturer.startswith('BOMBARDIER'), 'BOMBARDIER')
                  .when(planes.manufacturer.startswith('MCDONNELL'), 'MCDONNELL DOUGLAS')
                  .when(planes.manufacturer.startswith('CIRRUS'), 'CIRRUS')
                  .when(planes.manufacturer.startswith('BARKER'), 'BARKER JACK')
                  .when(planes.manufacturer.startswith('ROBINSON'), 'ROBINSON HELICOPTER')
                  .when(planes.manufacturer.startswith('GULFSTREAM'), 'GULFSTREAM')
                  .otherwise(planes.manufacturer)
))

### Pergunta 7

In [23]:
planes.select('model').distinct().sort('model').show(truncate = False)

+----------+
|model     |
+----------+
|150       |
|172M      |
|206B      |
|210-5(205)|
|421C      |
|737-301   |
|737-3A4   |
|737-3G7   |
|737-3H4   |
|737-3K2   |
|737-3L9   |
|737-3Q8   |
|737-3T5   |
|737-3TO   |
|737-3Y0   |
|737-490   |
|737-4Q8   |
|737-4S3   |
|737-5H4   |
|737-705   |
+----------+
only showing top 20 rows



In [24]:
planes = planes.withColumn('model',(
                    F.when( planes.model.contains("("), F.regexp_replace(planes.model, '\([^()]*\)', '' ))
                    .otherwise(planes.model)
))
planes = planes.withColumn('model',(
            F.trim(planes.model)
))

In [25]:
planes.select('model').distinct().sort('model').show(5,truncate = False)

+-----+
|model|
+-----+
|150  |
|172M |
|206B |
|210-5|
|421C |
+-----+
only showing top 5 rows



### Pergunta 8

In [26]:
planes.show(1)

+-------+----+---------+------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+--------+----------------+--------+----+
|tailnum|year|     type|manufacturer|   model|engines|seats|speed|  enginge|qa_tailnum|qa_year|qa_type|qa_manufacturer|qa_model|qa_engines|qa_seats|qa_speed|qa_enginge|tailchar|aux_manufacturer|aux_year| age|
+-------+----+---------+------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+--------+----------------+--------+----+
| N102UW|1998|MULTI_ENG|      AIRBUS|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|           null|    null|      null|    null|       M|      null|      UW|AIRBUS INDUSTRIE|    1989|24.0|
+-------+----+---------+------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+-------

In [27]:
planes = planes.withColumn('speed', (
                            F.when(planes.speed.isNull(), F.ceil((planes.seats)/(0.36)))
                            .otherwise(planes.speed)
))
planes.where(planes.speed.isNull()).show()

##verificar se está no intervalo possível do dataframe

+-------+----+----+------------+-----+-------+-----+-----+-------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+--------+----------------+--------+---+
|tailnum|year|type|manufacturer|model|engines|seats|speed|enginge|qa_tailnum|qa_year|qa_type|qa_manufacturer|qa_model|qa_engines|qa_seats|qa_speed|qa_enginge|tailchar|aux_manufacturer|aux_year|age|
+-------+----+----+------------+-----+-------+-----+-----+-------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+--------+----------------+--------+---+
+-------+----+----+------------+-----+-------+-----+-----+-------+----------+-------+-------+---------------+--------+----------+--------+--------+----------+--------+----------------+--------+---+



### Pergunta 9

In [28]:
planes.select('enginge').distinct().show()
planes = planes.withColumn('engine_split',(
                 F.split(planes.enginge, '-')[1]
))

planes.select('engine_split').distinct().show()

+-------------+
|      enginge|
+-------------+
|    Turbo-jet|
|      4 Cycle|
|    Turbo-fan|
|   Turbo-prop|
|Reciprocating|
|  Turbo-shaft|
+-------------+

+------------+
|engine_split|
+------------+
|        null|
|         jet|
|       shaft|
|         fan|
|        prop|
+------------+



In [29]:
planes = planes.withColumn('engine_type',(
                    F.when(planes.engine_split == 'jet', 'JET')
                    .when(planes.engine_split  == 'fan', 'FAN')
                    .when(planes.engine_split  == 'prop', 'PROP')
                    .when(planes.engine_split  == 'shaft', 'SHAFT')
                    .when(planes.enginge.endswith('Cycle'), 'CYCLE')
                    .otherwise(planes.enginge)
))

planes = planes.drop('engine_split')
planes.select('enginge', 'engine_type').distinct().show()

+-------------+-------------+
|      enginge|  engine_type|
+-------------+-------------+
|  Turbo-shaft|        SHAFT|
|Reciprocating|Reciprocating|
|    Turbo-fan|          FAN|
|      4 Cycle|        CYCLE|
|   Turbo-prop|         PROP|
|    Turbo-jet|          JET|
+-------------+-------------+



In [30]:
planes.write.mode('overwrite').parquet('../data/datasets_transformados/planes.parquet')

## Flights Dataset

In [31]:
flights_schema = (StructType([ 
    StructField("year",IntegerType(),True), 
    StructField("month",IntegerType(),True), 
    StructField("day",IntegerType(),True), 
    StructField("dep_time", StringType(), True), 
    StructField("dep_delay", IntegerType(), True), 
    StructField("arr_time", StringType(), True),
    StructField("arr_delay", IntegerType(), True ),
    StructField("carrier", StringType(), True),
    StructField("tailnum", StringType(), True),
    StructField("flight", StringType(), True),
    StructField("origin", StringType(), True),
    StructField("dest", StringType(), True),
    StructField("air_time", IntegerType(), True),
    StructField("distance", IntegerType(), True),
    StructField("hour", IntegerType(), True),
    StructField("minute", IntegerType(), True)
                    ])
                 )

flights = spark.getOrCreate().read.csv('../data/flights.csv', schema = flights_schema, header = True)

In [32]:
flights.show(5)
print(flights.count())

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

### Pergunta 1

In [33]:
flights = flights.withColumn('hour',(
                    F.when(flights.hour.isNull(), 0)
                    .otherwise(flights.hour)
))

flights = flights.withColumn('minute',(
                    F.when(flights.minute.isNull(), 0)
                    .otherwise(flights.minute)
))

### Pergunta 2

In [34]:
flights = flights.withColumn('hour',(
                    F.when(flights.hour == 24, 0)
                    .otherwise(flights.hour)
))

### Pergunta 3

In [35]:
flights = flights.withColumn('dep_datetime', 
                    F.to_timestamp( F.concat_ws(' ', F.concat_ws('-', flights.year, flights.month, flights.day),
                                                     F.concat_ws(':', flights.hour, flights.minute, F.lit(00))       
                                               )
                                  )
)


In [36]:
flights.select('dep_datetime').show(5)

+-------------------+
|       dep_datetime|
+-------------------+
|2014-12-08 06:58:00|
|2014-01-22 10:40:00|
|2014-03-09 14:43:00|
|2014-04-09 17:05:00|
|2014-03-09 07:54:00|
+-------------------+
only showing top 5 rows



### Pergunta 4

In [37]:
flights.show(2)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|       dep_datetime|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|2014-12-08 06:58:00|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|2014-01-22 10:40:00|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-------------------+
only showing top 2 rows



In [38]:
#flights.where(flights.dep_time == 'NA').show(5)

flights = flights.withColumn('dep_time',(
                    F.when(flights.dep_time == 'NA', F.concat_ws('',F.substring(flights.dep_datetime, 12,2), 
                                                                    F.substring(flights.dep_datetime, 14,2)))
                      .otherwise(flights.dep_time)
))

flights.select('dep_time').show(5)

+--------+
|dep_time|
+--------+
|     658|
|    1040|
|    1443|
|    1705|
|     754|
+--------+
only showing top 5 rows



### Pergunta 5 e 6

In [39]:
flights = flights.withColumn('dep_delay',(
                    F.when(flights.dep_delay.isNull(), 0)
                    .otherwise(flights.dep_delay)
))

flights = flights.withColumn('arr_delay',(
                    F.when(flights.arr_delay.isNull(), 0)
                    .otherwise(flights.arr_delay)
))

### Pergunta 7

In [40]:
flights = flights.drop('year', 'month', 'day', 'hour', 'minute')

### Pergunta 8

In [41]:
flights = flights.withColumn('air_time_projected',(
                    F.round((flights.distance * 0.1) + 20, 2)
))

flights = flights.withColumn('air_time_projected',(
                    flights.air_time_projected.cast(IntegerType())
))


### Pergunta 9
#### Montar parecido com a ideia do exercicio 3 planes 

In [42]:
flights = flights.withColumn('aux_origin_dest',(
                    F.concat_ws('-', flights.origin, flights.dest)
))

In [43]:
groupby_aux_origin_dest = (flights.groupBy('aux_origin_dest')
                                  .agg({'air_time': 'avg'})
                                  .withColumnRenamed('avg(air_time)', 'air_time_expected')
                                  .withColumnRenamed('aux_origin_dest', 'aux_origin_dest1')
                          )

In [44]:
flights = flights.join(groupby_aux_origin_dest,
             flights.aux_origin_dest == groupby_aux_origin_dest.aux_origin_dest1,
             'left'
            )

flights = flights.drop('aux_origin_dest1')
flights = flights.withColumn('air_time_expected',(
                            F.ceil(flights.air_time_expected)
))

In [45]:
flights.sort(flights.aux_origin_dest).show(3)

+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---------------+-----------------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|       dep_datetime|air_time_projected|aux_origin_dest|air_time_expected|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---------------+-----------------+
|    1033|       -2|    1411|       -4|     WN| N628SW|  1069|   PDX| ABQ|     143|    1111|2014-08-11 10:33:00|               131|        PDX-ABQ|              137|
|    1521|        6|    1852|       -3|     WN| N207WN|   747|   PDX| ABQ|     137|    1111|2014-06-13 15:21:00|               131|        PDX-ABQ|              137|
|     946|        1|    1317|       -8|     WN| N904WN|  2043|   PDX| ABQ|     137|    1111|2014-09-14 09:46:00|               131|        PDX-ABQ|              137|
+---

### Pergunta 10

In [46]:
flights = flights.withColumn('air_time',(
                        F.when(flights.air_time.isNull(), F.greatest('air_time_projected', 'air_time_expected'))
                         .otherwise(flights.air_time)
))


flights = flights.withColumn('air_time',( 
                  flights.air_time.cast(IntegerType())          
))


### Pergunta 11

In [47]:
flights = flights.withColumn('arr_time',(
                    F.when(flights.arr_time.isNull(), flights.dep_time + flights.air_time)
                    .otherwise(flights.arr_time)
))

flights.where(flights.arr_time> 60).show(5)

+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---------------+-----------------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|       dep_datetime|air_time_projected|aux_origin_dest|air_time_expected|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---------------+-----------------+
|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|2014-12-08 06:58:00|               115|        SEA-LAX|              127|
|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|2014-01-22 10:40:00|               287|        SEA-HNL|              344|
|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|2014-03-09 14:43:00|                87|        SEA-SFO|              102|
|   

### Pergunta 12

In [48]:
flights = flights.withColumn('haul_duration',(
                    F.when((flights.air_time >= 20) &
                           (flights.air_time < 180),
                           'SHORT-HAUL'
                          )
                     .when((flights.air_time >= 180) &
                           (flights.air_time < 360),
                           'MEDIUM-HAUL'
                          )
                     .when((flights.air_time >= 360), 
                           'LONG-HAUL'
                          )
))

### Pergunta 13

In [49]:
flights.select('dep_datetime').show(2)

+-------------------+
|       dep_datetime|
+-------------------+
|2014-12-08 06:58:00|
|2014-01-22 10:40:00|
+-------------------+
only showing top 2 rows



In [50]:
flights = flights.withColumn('dep_season',
F.when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-03-20 15:33:00')),F.concat(F.year('dep_datetime'),F.lit('-06-21 10:14:00'))), 'SPRING')
 .when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-06-21 10:14:00')),F.concat(F.year('dep_datetime'),F.lit('-09-23 02:04:00'))), 'SUMMER')
 .when(F.col('dep_datetime').between(F.concat(F.year('dep_datetime'),F.lit('-09-23 02:04:00')),F.concat(F.year('dep_datetime'),F.lit('-12-21 21:48:00'))), 'FALL')
 .otherwise('WINTER')
                                  )


flights.select('dep_season').groupBy('dep_season').count().show()


+----------+-----+
|dep_season|count|
+----------+-----+
|    WINTER| 2149|
|    SPRING| 2560|
|      FALL| 2373|
|    SUMMER| 2918|
+----------+-----+



In [51]:
flights = flights.withColumn('dep_delay_category',(
                    F.when(flights.dep_delay < 0, 'ANTECIPATED')
                     .when(flights.dep_delay == 0, 'INTIME')
                     .when((flights.dep_delay > 0) & (flights.dep_delay < 60), 'MINOR')
                     .when(flights.dep_delay >= 60, 'MAJOR')
))

In [52]:
flights.write.mode('overwrite').parquet('../data/datasets_transformados/flights.parquet')