In [1]:
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()

In [3]:
import re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [4]:
# Expressoes regulares comuns
REGEX_ALPHA    = r'[a-zA-Z]+'
REGEX_INTEGER  = r'[0-9]+'
REGEX_FLOAT    = r'[0-9]+\.[0-9]+'
REGEX_ALPHANUM = r'[0-9a-zA-Z]+'
REGEX_EMPTY_STR= r'[\t ]+$'
REGEX_SPECIAL  = r'[!@#$%&*\(\)_]+'

In [5]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[*]")
                     .appName("Aceleração PySpark - Capgemini"))

In [6]:
# Cria o schema adequado
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   FloatType(), True),
    StructField("dst",  StringType(),  True)
])

In [7]:
# Cria o schema adequado
schema_planes = StructType([
    StructField("tailnum",  StringType(),  True),
    StructField("year",  IntegerType(),  True),
    StructField("type", StringType(),  True),
    StructField("manufacturer",  StringType(),   True),
    StructField("model",  StringType(),   True),
    StructField("engines",  IntegerType(), True),
    StructField("seats",   IntegerType(), True),
    StructField("speed",  IntegerType(),  True),
     StructField("engine",  StringType(),  True)
])

In [8]:
# Cria o schema adequado

schema_flights = StructType([
StructField("year", IntegerType(), True),
StructField("month", IntegerType(), True),
StructField("day", IntegerType(), True),
StructField("dep_time", StringType(), True),
StructField("dep_delay", IntegerType(), True),
StructField("arr_time", StringType(), True),
StructField("arr_delay", IntegerType(), True),
StructField("carrier", StringType(), True),
StructField("tailnum", StringType(), True),
StructField("flight", StringType(), True),
StructField("origin", StringType(), True),
StructField("dest", StringType(), True),
StructField("air_time", IntegerType(), True),
StructField("distance", IntegerType(), True),
StructField("hour", IntegerType(), True),
StructField("minute", IntegerType(), True)
    
])

In [9]:
# Carrega os dados com as configurações necessárias

path_airports = ("C:/projetos/spark/transformation/airports.csv")

df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load(path_airports))

path_planes = ("C:/projetos/spark/transformation/planes.csv")

df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load(path_planes))

path_flights = ("C:/projetos/spark/transformation/flights.csv")

df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load(path_flights))

In [10]:
df_flights = df_flights[['year', 'month', 'day','hour', 'minute','dep_time','arr_time', 'dep_delay', 'arr_delay', 'carrier', 'tailnum', 'flight',
'origin','dest', 'air_time', 'distance']]

df_flights.printSchema()
df_flights.show()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)

+----+-----+---+----+------+--------+--------+---------+---------+-------+-------+------+------+----+--------+--------+
|year|month|day|hour|minute|dep_time|arr_time|dep_delay|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|
+----+-----+---+----+------+--------+--------+---------+---------+-------+-------+------+------+----+--------+--------+
|2014|   12

In [11]:
df_airports.show()
df_airports.printSchema()

+---+--------------------+---------+-----------+----+----+---+
|faa|                name|      lat|        lon| alt|  tz|dst|
+---+--------------------+---------+-----------+----+----+---+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044|-5.0|  A|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264|-5.0|  A|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801|-6.0|  A|
|06N|     Randall Airport| 41.43191|  -74.39156| 523|-5.0|  A|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11|-4.0|  A|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593|-4.0|  A|
|0G6|Williams County A...|41.467304| -84.506775| 730|-5.0|  A|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492|-5.0|  A|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000|-5.0|  U|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108|-8.0|  A|
|0W3|Harford County Ai...|39.566837|   -76.2024| 409|-5.0|  A|
|10C|  Galt Field Airport| 42.40289| -88.375114| 875|-6.0|  U|
|17G|Port Bucyrus-Craw...|40.781555|  -82.97481|1003|-5

In [12]:
#Visão Temporaria
df_airports.createOrReplaceTempView('airports_final')

## Airports Dataset - Perguntas

## Pergunta 1

In [13]:
df_airports = df_airports.withColumn("alt",
                                     F.when(F.col("alt") < 0, 0)
                                      .otherwise(df_airports.alt))

#Recriei a visão temporária a partir da que já tinha para contabilizar as alterações.
df_airports.createOrReplaceTempView('airports_final')

spark.getOrCreate().sql("select alt, Count(*) from airports_final Group By alt Order By alt").show()

+---+--------+
|alt|count(1)|
+---+--------+
|  0|      51|
|  1|       2|
|  2|       1|
|  3|       2|
|  4|       3|
|  5|       3|
|  6|       3|
|  7|       7|
|  8|       8|
|  9|       7|
| 10|      11|
| 11|       6|
| 12|       8|
| 13|      12|
| 14|      11|
| 15|      11|
| 16|       3|
| 17|       7|
| 18|      10|
| 19|       6|
+---+--------+
only showing top 20 rows



## Pergunta 2

In [14]:
df_airports = df_airports.withColumn('dst',
                                       F.when(F.col('tz').between(-7,-5),"A")
                                        .otherwise(df_airports.dst))

df_airports.groupBy("dst").count().distinct().orderBy("dst", ascending=True).show()

+---+-----+
|dst|count|
+---+-----+
|  A| 1380|
|  N|    9|
|  U|    8|
+---+-----+



## Pergunta 3

In [15]:
df_airports = df_airports.withColumn("dst", (
                                                F.when((F.col("dst") == "U"), "A")
                                                .otherwise(df_airports.dst)))

df_airports.groupBy("dst").count().distinct().orderBy("dst", ascending=True).show()

+---+-----+
|dst|count|
+---+-----+
|  A| 1388|
|  N|    9|
+---+-----+



## Pergunta 4

In [16]:
df_airports = df_airports.withColumn('region',
                                       F.when(F.col('lon') < -124, "ALASKA")
                                       .when((F.col('lon') > -50) | (F.col('lat') < 24), "OFFSHORE")
                                       .when((F.col('lon') <= -95) & (F.col('lon').between(-124,-50)), "MAINLAND-WEST")
                                       .when((F.col('lon') > -95) & (F.col('lon').between(-124,-50)),  "MAINLAND-EAST")
                                       .otherwise(float("NaN")))

df_airports.groupBy("region").count().distinct().orderBy("region", ascending=True).show()

+-------------+-----+
|       region|count|
+-------------+-----+
|       ALASKA|  261|
|MAINLAND-EAST|  696|
|MAINLAND-WEST|  436|
|     OFFSHORE|    4|
+-------------+-----+



## Pergunta 5

In [17]:
AP = ["Airport", "Tradeport", "Heliport", "Airpor","Arpt"]
AD = ["Aerodrome"]
AK = ["Airpark", "Aero Park"]
AS = ["Station" ,"Air Station"]
FL = ["Field", "Fld"]

df_airports = df_airports.withColumn("type", (
                        F.when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", AP)))), "AP")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", AD)))), "AD")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", AK)))), "AK")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", AS)))), "AS")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", FL)))), "FL")
                         .otherwise("NaN")
))

df_airports.groupBy("type").count().distinct().orderBy("type", ascending=True).show()

+----+-----+
|type|count|
+----+-----+
|  AD|    1|
|  AK|   12|
|  AP|  624|
|  AS|   19|
|  FL|   78|
| NaN|  663|
+----+-----+



## Pergunta 6

In [18]:
LIST_MILITARY = ["Base", "Aaf", "Afs", "Ahp", "Afb", "LRRS", "Lrrs", "Arb", "Naf", "NAS", "Nas", "Jrb", "Ns",
"As", "Cgas", "Angb"]

REGEX_SUBSTRINGS = r'|'.join(map(lambda x : f'^{x} | {x} | {x}$', LIST_MILITARY))

df_airports = df_airports.withColumn('military',
                                    F.when(F.col('name').rlike(REGEX_SUBSTRINGS), True)
                                    .otherwise(False))

df_airports.groupBy("military").count().distinct().orderBy("military", ascending=True).show()

+--------+-----+
|military|count|
+--------+-----+
|   false| 1236|
|    true|  161|
+--------+-----+



## Pergunta 7

In [19]:
I = ["International", "Intl", "Intercontinental"]
N = ["National", "Natl"]
R = ["Regional", "Reigonal", "Rgnl", "County", "Metro", "Metropolitan"]
M = ["Municipal", "Muni", "City"]



df_airports = df_airports.withColumn("administration", (
                        F.when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", I)))), "I")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", N)))), "N")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", R)))), "R")
                         .when((F.col("name").rlike(r'|'.join(map(lambda x : f".*({x}).*", M)))), "M")
                         .otherwise("NaN")
))

df_airports.groupBy("administration").count().distinct().orderBy("administration", ascending=True).show()

+--------------+-----+
|administration|count|
+--------------+-----+
|             I|  164|
|             M|  180|
|             N|    5|
|           NaN|  761|
|             R|  287|
+--------------+-----+



## Salvando arquivo

In [20]:
(df_airports.repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save("C:/projetos/spark/transformation/airports_proc.parquet"))

## Planes Dataset - Perguntas

In [23]:
#Visão Temporaria
df_planes.createOrReplaceTempView('planes')

In [24]:
df_planes.show()
df_planes.printSchema()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null

## Pergunta 1

In [25]:
df_planes = df_planes.withColumn("tailchar", 
                    F.regexp_replace(F.col("tailnum"), '[0-9]|^N', ""))

df_planes.groupBy("tailchar").count().distinct().orderBy("tailchar", ascending=True).show()

+--------+-----+
|tailchar|count|
+--------+-----+
|        |  298|
|       A|   46|
|      AA|   67|
|      AG|   10|
|      AS|  138|
|      AT|    4|
|      AW|   87|
|      AY|    4|
|       B|   28|
|      BR|    6|
|       C|   20|
|      CA|   21|
|      CB|    2|
|      CT|    1|
|       D|   16|
|      DA|   46|
|      DE|    2|
|      DH|    2|
|      DL|   61|
|      DN|  118|
+--------+-----+
only showing top 20 rows



## Pergunta 2

In [26]:
df_planes = df_planes.withColumn("year",
                                 F.when(F.col('year') == 0, 1996)
                                  .otherwise(F.col('year'))
                                )

df_planes.groupBy("year").count().distinct().orderBy("year", ascending=True).show()

+----+-----+
|year|count|
+----+-----+
|null|   60|
|1959|    1|
|1963|    1|
|1968|    1|
|1975|    2|
|1976|    1|
|1980|    1|
|1984|    5|
|1985|   15|
|1986|   13|
|1987|   23|
|1988|   29|
|1989|   21|
|1990|   42|
|1991|   48|
|1992|   78|
|1993|   41|
|1994|   39|
|1995|   54|
|1996|   73|
+----+-----+
only showing top 20 rows



## Pergunta 3

In [27]:
# Criando tabela ordenada por Manufacturer e Model
df_ordered = df_planes.groupBy("manufacturer",'model').min('year').orderBy('manufacturer','model')

# Renomeando colunas
df_ordered = (df_ordered.withColumnRenamed('manufacturer', 'manufacturer1')
                        .withColumnRenamed('model', 'model1')
                        .withColumnRenamed('min(year)', 'year1')
             )

# Criando tabela ordenada somente por manufacturer
df_ordered2 = (df_planes.groupBy('manufacturer')
                        .min('year')
                        .orderBy('manufacturer')
              )

# Renomeando colunas
df_ordered2 = (df_ordered2.withColumnRenamed('manufacturer', 'manufacturer2')
                         .withColumnRenamed('min(year)', 'year2')
              )

# Fazendo left join da tabela df_planes com a tabela df_ordered
df_final = (df_planes.join(df_ordered, 
                          (df_planes.manufacturer == df_ordered.manufacturer1) &
                          (df_planes.model == df_ordered.model1),'left')
           )

# Fazendo left join da tabela df_final com a df_ordered2
df_final = df_final.join(df_ordered2, df_final.manufacturer == df_ordered2.manufacturer2 , 'left')

# Modificando a coluna year para inputar os anos
df_final = (df_final.withColumn('year',
                  F.when((F.col('year').isNull()) & (F.col('year1').isNull()), F.col('year2'))
                   .when(F.col('year').isNull(), F.col('year1'))
                   .otherwise(F.col('year')))
           )

# Dropando as colunas criadas pelo join
df_final = df_final.drop('manufacturer1','manufacturer2','model1','year1','year2')


df_final.show()

df_planes = df_final

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|tailchar|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      UW|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      US|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      UW|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      UW|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      US|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      UW|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|

In [28]:
df_planes.select("year","manufacturer","model").filter(F.col('year').isNull()).show()

+----+---------------+-------------+
|year|   manufacturer|        model|
+----+---------------+-------------+
|null|LAMBERT RICHARD|    FALCON XP|
|null|  BARKER JACK L|ZODIAC 601HDS|
+----+---------------+-------------+



## Pergunta 4

In [29]:
df_planes = df_planes.withColumn("age",
                    F.expr("2022 - year"))

df_planes.groupBy("age").count().distinct().orderBy("age", ascending=False).show()

+---+-----+
|age|count|
+---+-----+
| 63|    1|
| 59|    1|
| 54|    1|
| 47|    2|
| 46|    1|
| 42|    1|
| 38|    6|
| 37|   15|
| 36|   13|
| 35|   23|
| 34|   29|
| 33|   21|
| 32|   42|
| 31|   48|
| 30|   78|
| 29|   42|
| 28|   45|
| 27|   55|
| 26|   73|
| 25|   80|
+---+-----+
only showing top 20 rows



## Pergunta 5

In [30]:
df_planes = (df_planes.withColumn('type',
                                F.when(F.col('type') == "Fixed wing multi engine", "MULTI_ENG")
                                 .when(F.col('type') == "Fixed wing single engine", "SINGLE_ENG")
                                 .when(F.col('type') == "Rotorcraft", "ROTORCRAFT")
                                 .otherwise(F.col('type')))                              
                                
                                )

df_planes.groupBy("type").count().distinct().orderBy("type", ascending=True).show()

+----------+-----+
|      type|count|
+----------+-----+
| MULTI_ENG| 2615|
|ROTORCRAFT|    3|
|SINGLE_ENG|   10|
+----------+-----+



## Pergunta 6

In [31]:
df_planes.groupBy("manufacturer").count().distinct().orderBy("manufacturer", ascending=True).show()

+--------------------+-----+
|        manufacturer|count|
+--------------------+-----+
|              AIRBUS|  397|
|    AIRBUS INDUSTRIE|  401|
|       BARKER JACK L|    1|
|                BELL|    1|
|              BOEING| 1460|
|      BOMBARDIER INC|  214|
|            CANADAIR|    8|
|              CESSNA|    4|
|  CIRRUS DESIGN CORP|    1|
|             EMBRAER|   37|
|GULFSTREAM AEROSPACE|    1|
|        KILDALL GARY|    1|
|     LAMBERT RICHARD|    1|
|          MARZ BARRY|    1|
|   MCDONNELL DOUGLAS|   94|
|MCDONNELL DOUGLAS...|    2|
|               PIPER|    2|
|ROBINSON HELICOPT...|    1|
|            SIKORSKY|    1|
+--------------------+-----+



In [32]:
df_planes = (df_planes.withColumn('manufacturer',
                                F.when(F.col('manufacturer').startswith('AIRBUS'), "AIRBUS")
                                 .when(F.col('manufacturer').startswith('BARKER'), "BARKER JACK")
                                 .when(F.col('manufacturer').startswith('BELL'), "BELL") 
                                 .when(F.col('manufacturer').startswith('BOEING'), "BOEING")
                                 .when(F.col('manufacturer').startswith('BOMBARDIER'), "BOMBARDIER")
                                 .when(F.col('manufacturer').startswith('CANADAIR'), "CANADAIR")
                                 .when(F.col('manufacturer').startswith('CESSNA'), "CESSNA")
                                 .when(F.col('manufacturer').startswith('CIRRUS'), "CIRRUS")
                                 .when(F.col('manufacturer').startswith('EMBRAER'), "EMBRAER")
                                 .when(F.col('manufacturer').startswith('GULFSTREAM'), "GULFSTREAM")
                                 .when(F.col('manufacturer').startswith('KILDALL'), "KILDALL GARY")
                                 .when(F.col('manufacturer').startswith('LAMBERT'), "LAMBERT RICHARD")
                                 .when(F.col('manufacturer').startswith('MARZ'), "MARZ BARRY")
                                 .when(F.col('manufacturer').startswith('MCDONNELL'), "MCDONNELL DOUGLAS")
                                 .when(F.col('manufacturer').startswith('PIPER'), "PIPER")
                                 .when(F.col('manufacturer').startswith('ROBINSON'), "ROBINSON HELICOPTER")
                                 .when(F.col('manufacturer').startswith('SIKORSKY'), "SIKORSKY") 
            ))

df_planes.groupBy("manufacturer").count().distinct().orderBy("manufacturer", ascending=True).show()

+-------------------+-----+
|       manufacturer|count|
+-------------------+-----+
|             AIRBUS|  798|
|        BARKER JACK|    1|
|               BELL|    1|
|             BOEING| 1460|
|         BOMBARDIER|  214|
|           CANADAIR|    8|
|             CESSNA|    4|
|             CIRRUS|    1|
|            EMBRAER|   37|
|         GULFSTREAM|    1|
|       KILDALL GARY|    1|
|    LAMBERT RICHARD|    1|
|         MARZ BARRY|    1|
|  MCDONNELL DOUGLAS|   96|
|              PIPER|    2|
|ROBINSON HELICOPTER|    1|
|           SIKORSKY|    1|
+-------------------+-----+



## Pergunta 7

In [33]:
df_planes.groupBy("model").count().distinct().orderBy("model", ascending=True).show(1000)

+--------------+-----+
|         model|count|
+--------------+-----+
|           150|    1|
|          172M|    1|
|          206B|    1|
|    210-5(205)|    1|
|          421C|    1|
|       737-301|    2|
|       737-3A4|    1|
|       737-3G7|    2|
|       737-3H4|  104|
|       737-3K2|    2|
|       737-3L9|    2|
|       737-3Q8|    4|
|       737-3T5|    1|
|       737-3TO|    2|
|       737-3Y0|    1|
|       737-490|   16|
|       737-4Q8|    9|
|       737-4S3|    1|
|       737-5H4|   10|
|       737-705|    2|
|       737-724|   31|
|       737-732|    2|
|       737-73V|    1|
|       737-76N|   15|
|       737-76Q|    3|
|       737-790|   17|
|       737-7AD|    1|
|       737-7BD|   32|
|       737-7BX|    3|
|       737-7H4|  360|
|       737-7K9|    2|
|       737-7Q8|    1|
|       737-824|  122|
|       737-832|   73|
|       737-890|   60|
|       737-8FH|    1|
|       737-8H4|   81|
|       737-924|   12|
|     737-924ER|  102|
|     737-932ER|   30|
|       737

In [34]:
df_planes = (df_planes.withColumn("model", 
                    F.regexp_replace(F.col("model"), '\s*\([^()]*\)\s*', "")
            ))

df_planes.groupBy("model").count().distinct().orderBy("model", ascending=True).show(1000)

+-------------+-----+
|        model|count|
+-------------+-----+
|          150|    1|
|         172M|    1|
|         206B|    1|
|        210-5|    1|
|         421C|    1|
|      737-301|    2|
|      737-3A4|    1|
|      737-3G7|    2|
|      737-3H4|  104|
|      737-3K2|    2|
|      737-3L9|    2|
|      737-3Q8|    4|
|      737-3T5|    1|
|      737-3TO|    2|
|      737-3Y0|    1|
|      737-490|   16|
|      737-4Q8|    9|
|      737-4S3|    1|
|      737-5H4|   10|
|      737-705|    2|
|      737-724|   31|
|      737-732|    2|
|      737-73V|    1|
|      737-76N|   15|
|      737-76Q|    3|
|      737-790|   17|
|      737-7AD|    1|
|      737-7BD|   32|
|      737-7BX|    3|
|      737-7H4|  360|
|      737-7K9|    2|
|      737-7Q8|    1|
|      737-824|  122|
|      737-832|   73|
|      737-890|   60|
|      737-8FH|    1|
|      737-8H4|   81|
|      737-924|   12|
|    737-924ER|  102|
|    737-932ER|   30|
|      737-990|   12|
|    737-990ER|   23|
|      747

## Pergunta 8

In [35]:
df_planes = (df_planes.withColumn('speed',
            F.when((F.col('speed').isNull()) &
                   (F.col('seats').isNotNull()), F.ceil(F.expr('seats/0.36')))
             .otherwise(F.col('speed'))
    ))

df_planes = df_planes.withColumn('speed',
                                 F.col('speed').cast(IntegerType()))

df_planes.filter(F.col('tailnum') == 'N102UW').show()

+-------+----+---------+------------+--------+-------+-----+-----+---------+--------+---+
|tailnum|year|     type|manufacturer|   model|engines|seats|speed|   engine|tailchar|age|
+-------+----+---------+------------+--------+-------+-----+-----+---------+--------+---+
| N102UW|1998|MULTI_ENG|      AIRBUS|A320-214|      2|  182|  506|Turbo-fan|      UW| 24|
+-------+----+---------+------------+--------+-------+-----+-----+---------+--------+---+



In [36]:
df_planes = df_planes.withColumn("speed",
                    (F.when(~F.col('speed').between(50, 150), 0))
                                           .otherwise(F.col('speed')))

In [37]:
df_planes.groupBy("speed").count().distinct().orderBy("speed", ascending=True).show()

+-----+-----+
|speed|count|
+-----+-----+
|    0| 2584|
|   56|    1|
|   89|   37|
|   90|    2|
|  107|    1|
|  108|    1|
|  112|    1|
|  126|    1|
+-----+-----+



## Pergunta 9

In [38]:
df_planes = (df_planes.withColumn('engine_type',
                                F.when(F.col('engine') == "Turbo-fan", "FAN")
                                 .when(F.col('engine') == "Turbo-jet", "JET")
                                 .when(F.col('engine') == "Turbo-prop", "PROP")
                                 .when(F.col('engine') == "Turbo-shaft", "SHAFT")
                                 .when(F.col('engine') == "4 Cycle", "CYCLE") 
                                 .otherwise((None)))                              
                                
                                )

df_planes.groupBy("engine_type").count().distinct().orderBy("engine_type", ascending=True).show()

+-----------+-----+
|engine_type|count|
+-----------+-----+
|       null|   10|
|      CYCLE|    1|
|        FAN| 2127|
|        JET|  450|
|       PROP|   37|
|      SHAFT|    3|
+-----------+-----+



## Salvando arquivo

In [39]:
(df_planes
            .repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save("C:/projetos/spark/transformation/planes_proc.parquet"))

## Flights Dataset - Perguntas

In [40]:
#Visão Temporaria
df_flights.createOrReplaceTempView('flights')

## Pergunta 1

In [41]:
df_flights = (df_flights.withColumn('hour',
                                  F.when(F.col('hour').isNull(), 0)
                                   .otherwise(F.col('hour'))                     
                               
            ))

df_flights.groupBy("hour").count().distinct().orderBy("hour", ascending=True).show(25)

+----+-----+
|hour|count|
+----+-----+
|   0|  137|
|   1|   17|
|   2|    4|
|   5|  431|
|   6|  899|
|   7|  709|
|   8|  659|
|   9|  456|
|  10|  803|
|  11|  723|
|  12|  539|
|  13|  653|
|  14|  540|
|  15|  468|
|  16|  388|
|  17|  394|
|  18|  570|
|  19|  409|
|  20|  354|
|  21|  281|
|  22|  314|
|  23|  251|
|  24|    1|
+----+-----+



In [42]:
df_flights = (df_flights.withColumn('minute',
                                  F.when(F.col('minute').isNull(), 0)
                                   .otherwise(F.col('minute'))                     
                               
            ))

df_flights.groupBy("minute").count().distinct().orderBy("minute", ascending=True).show(60)

+------+-----+
|minute|count|
+------+-----+
|     0|  226|
|     1|  184|
|     2|  173|
|     3|  161|
|     4|  157|
|     5|  167|
|     6|  157|
|     7|  153|
|     8|  153|
|     9|  183|
|    10|  160|
|    11|  161|
|    12|  161|
|    13|  150|
|    14|  136|
|    15|  162|
|    16|  152|
|    17|  155|
|    18|  130|
|    19|  155|
|    20|  163|
|    21|  139|
|    22|  156|
|    23|  142|
|    24|  177|
|    25|  188|
|    26|  154|
|    27|  172|
|    28|  159|
|    29|  180|
|    30|  155|
|    31|  153|
|    32|  149|
|    33|  173|
|    34|  125|
|    35|  154|
|    36|  133|
|    37|  135|
|    38|  150|
|    39|  155|
|    40|  172|
|    41|  166|
|    42|  160|
|    43|  153|
|    44|  134|
|    45|  145|
|    46|  142|
|    47|  173|
|    48|  176|
|    49|  155|
|    50|  196|
|    51|  161|
|    52|  191|
|    53|  200|
|    54|  198|
|    55|  230|
|    56|  201|
|    57|  242|
|    58|  235|
|    59|  222|
+------+-----+



## Pergunta 2

In [43]:
df_flights = (df_flights.withColumn('hour',
                                  F.when(F.col('hour') == 24, 0)
                                   .otherwise(F.col('hour'))                     
                               
            ))

df_flights.groupBy("hour").count().distinct().orderBy("hour", ascending=True).show(24)

+----+-----+
|hour|count|
+----+-----+
|   0|  138|
|   1|   17|
|   2|    4|
|   5|  431|
|   6|  899|
|   7|  709|
|   8|  659|
|   9|  456|
|  10|  803|
|  11|  723|
|  12|  539|
|  13|  653|
|  14|  540|
|  15|  468|
|  16|  388|
|  17|  394|
|  18|  570|
|  19|  409|
|  20|  354|
|  21|  281|
|  22|  314|
|  23|  251|
+----+-----+



## Pergunta 3

In [44]:
df_flights = df_flights.withColumn('dep_datetime',
                                    F.expr('make_timestamp(year, month, day, hour, minute, 00)'))

df_flights.toPandas()

Unnamed: 0,year,month,day,hour,minute,dep_time,arr_time,dep_delay,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime
0,2014,12,8,6,58,658,935,-7.0,-5.0,VX,N846VA,1780,SEA,LAX,132.0,954,2014-12-08 06:58:00
1,2014,1,22,10,40,1040,1505,5.0,5.0,AS,N559AS,851,SEA,HNL,360.0,2677,2014-01-22 10:40:00
2,2014,3,9,14,43,1443,1652,-2.0,2.0,VX,N847VA,755,SEA,SFO,111.0,679,2014-03-09 14:43:00
3,2014,4,9,17,5,1705,1839,45.0,34.0,WN,N360SW,344,PDX,SJC,83.0,569,2014-04-09 17:05:00
4,2014,3,9,7,54,754,1015,-1.0,1.0,AS,N612AS,522,SEA,BUR,127.0,937,2014-03-09 07:54:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,2014,6,23,18,6,1806,2104,-4.0,-6.0,OO,N225AG,3458,SEA,SLC,89.0,689,2014-06-23 18:06:00
9996,2014,8,31,23,36,2336,452,11.0,-13.0,AA,N3LEAA,1230,SEA,DFW,178.0,1660,2014-08-31 23:36:00
9997,2014,8,8,9,4,904,1042,-1.0,-5.0,AS,N523AS,360,SEA,SMF,81.0,605,2014-08-08 09:04:00
9998,2014,8,29,14,41,1441,1820,26.0,10.0,WN,N8647A,2857,SEA,ABQ,133.0,1180,2014-08-29 14:41:00


In [45]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- dep_datetime: timestamp (nullable = true)



## Pergunta 4

In [46]:
df_flights.groupBy('dep_time').count().orderBy('dep_time', ascending= True).show()

+--------+-----+
|dep_time|count|
+--------+-----+
|       1|    6|
|    1000|    8|
|    1001|   16|
|    1002|    5|
|    1003|   12|
|    1004|    9|
|    1005|    6|
|    1006|   17|
|    1007|   10|
|    1008|    9|
|    1009|   18|
|     101|    1|
|    1010|   12|
|    1011|   10|
|    1012|   18|
|    1013|   12|
|    1014|   13|
|    1015|   17|
|    1016|   15|
|    1017|   17|
+--------+-----+
only showing top 20 rows



In [47]:
df_flights = (df_flights.withColumn('dep_time',
                                  F.when(F.col('dep_time') == 'NA', 
                                  F.concat(F.col('hour'), 
                                  F.lpad(F.col('minute'), 2, '0')))
                                   .otherwise(F.col('dep_time'))
                ))

In [48]:
df_flights.groupBy('dep_time').count().orderBy('dep_time', ascending= True).show()

+--------+-----+
|dep_time|count|
+--------+-----+
|     000|   48|
|       1|    6|
|    1000|    8|
|    1001|   16|
|    1002|    5|
|    1003|   12|
|    1004|    9|
|    1005|    6|
|    1006|   17|
|    1007|   10|
|    1008|    9|
|    1009|   18|
|     101|    1|
|    1010|   12|
|    1011|   10|
|    1012|   18|
|    1013|   12|
|    1014|   13|
|    1015|   17|
|    1016|   15|
+--------+-----+
only showing top 20 rows



## Pergunta 5

In [49]:
df_flights = (df_flights.withColumn('dep_delay',
                                  F.when(F.col('dep_delay').isNull(), 0)
                                   .otherwise(F.col('dep_delay'))                     
                               
            ))

df_flights.groupBy("dep_delay").count().distinct().orderBy("dep_delay", ascending=True).show(1000)

+---------+-----+
|dep_delay|count|
+---------+-----+
|      -19|    1|
|      -18|    3|
|      -17|    4|
|      -16|    5|
|      -15|   10|
|      -14|   14|
|      -13|   37|
|      -12|   63|
|      -11|   80|
|      -10|  196|
|       -9|  208|
|       -8|  319|
|       -7|  471|
|       -6|  625|
|       -5|  742|
|       -4|  838|
|       -3|  852|
|       -2|  760|
|       -1|  666|
|        0|  646|
|        1|  311|
|        2|  227|
|        3|  209|
|        4|  168|
|        5|  148|
|        6|  137|
|        7|  117|
|        8|  125|
|        9|   94|
|       10|   95|
|       11|   96|
|       12|   78|
|       13|   60|
|       14|   70|
|       15|   66|
|       16|   59|
|       17|   55|
|       18|   55|
|       19|   54|
|       20|   40|
|       21|   49|
|       22|   54|
|       23|   34|
|       24|   31|
|       25|   36|
|       26|   32|
|       27|   36|
|       28|   38|
|       29|   22|
|       30|   24|
|       31|   25|
|       32|   14|
|       33

## Pergunta 6

In [50]:
df_flights = (df_flights.withColumn('arr_delay',
                                  F.when(F.col('arr_delay').isNull(), 0)
                                   .otherwise(F.col('arr_delay'))                     
                               
            ))

df_flights.groupBy("arr_delay").count().distinct().orderBy("arr_delay", ascending=True).show(1000)

+---------+-----+
|arr_delay|count|
+---------+-----+
|      -58|    1|
|      -53|    1|
|      -50|    1|
|      -48|    1|
|      -47|    2|
|      -46|    1|
|      -45|    2|
|      -44|    2|
|      -43|    2|
|      -42|    6|
|      -41|    4|
|      -40|    7|
|      -39|    3|
|      -38|   14|
|      -37|    7|
|      -36|   15|
|      -35|   14|
|      -34|   17|
|      -33|   14|
|      -32|   17|
|      -31|   34|
|      -30|   27|
|      -29|   35|
|      -28|   43|
|      -27|   60|
|      -26|   66|
|      -25|   49|
|      -24|   82|
|      -23|   83|
|      -22|  101|
|      -21|  109|
|      -20|  127|
|      -19|  141|
|      -18|  163|
|      -17|  188|
|      -16|  179|
|      -15|  202|
|      -14|  257|
|      -13|  270|
|      -12|  285|
|      -11|  288|
|      -10|  254|
|       -9|  300|
|       -8|  316|
|       -7|  330|
|       -6|  323|
|       -5|  331|
|       -4|  314|
|       -3|  295|
|       -2|  279|
|       -1|  286|
|        0|  327|
|        1

In [51]:
df_flights.groupBy(F.col('arr_delay') == 0).count().show(50)

+---------------+-----+
|(arr_delay = 0)|count|
+---------------+-----+
|           true|  327|
|          false| 9673|
+---------------+-----+



## Pergunta 7

In [52]:
df_flights = df_flights.drop("year", "month", "day","hour", "minute")

## Pergunta 8

In [53]:
df_flights = df_flights.withColumn('air_time_projected',
                                   ((F.col('distance') * 0.1) + 20).cast(IntegerType()))

In [54]:
df_flights.printSchema()

root
 |-- dep_time: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- dep_datetime: timestamp (nullable = true)
 |-- air_time_projected: integer (nullable = true)



In [55]:
df_flights.groupBy("air_time_projected").count().distinct().orderBy("air_time_projected", ascending=True).show(270)

+------------------+-----+
|air_time_projected|count|
+------------------+-----+
|                29|    5|
|                30|   41|
|                31|   54|
|                32|  301|
|                42|  105|
|                44|   10|
|                54|    7|
|                64|   13|
|                67|   93|
|                74|  121|
|                75|  305|
|                76|  164|
|                80|  190|
|                83|  171|
|                87|  695|
|                88|  302|
|                89|  213|
|                94|   47|
|                96|  156|
|                98|   23|
|               101|   71|
|               103|  222|
|               104|   56|
|               105|   51|
|               106|  370|
|               107|   14|
|               109|    4|
|               110|  100|
|               113|  157|
|               115|  521|
|               116|  119|
|               117|  147|
|               118|   34|
|               119|  235|
|

## Pergunta 9

In [56]:
df_flights.toPandas()

Unnamed: 0,dep_time,arr_time,dep_delay,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime,air_time_projected
0,658,935,-7,-5,VX,N846VA,1780,SEA,LAX,132.0,954,2014-12-08 06:58:00,115
1,1040,1505,5,5,AS,N559AS,851,SEA,HNL,360.0,2677,2014-01-22 10:40:00,287
2,1443,1652,-2,2,VX,N847VA,755,SEA,SFO,111.0,679,2014-03-09 14:43:00,87
3,1705,1839,45,34,WN,N360SW,344,PDX,SJC,83.0,569,2014-04-09 17:05:00,76
4,754,1015,-1,1,AS,N612AS,522,SEA,BUR,127.0,937,2014-03-09 07:54:00,113
...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1806,2104,-4,-6,OO,N225AG,3458,SEA,SLC,89.0,689,2014-06-23 18:06:00,88
9996,2336,452,11,-13,AA,N3LEAA,1230,SEA,DFW,178.0,1660,2014-08-31 23:36:00,186
9997,904,1042,-1,-5,AS,N523AS,360,SEA,SMF,81.0,605,2014-08-08 09:04:00,80
9998,1441,1820,26,10,WN,N8647A,2857,SEA,ABQ,133.0,1180,2014-08-29 14:41:00,138


In [57]:
df_flights_1 = df_flights.groupBy("origin", "dest").agg(F.avg('air_time').cast(IntegerType()).alias('air_time_expected')).orderBy('origin')

In [58]:
print('Actual columns:', df_flights_1.columns)


df_flights_1 = (df_flights_1.withColumnRenamed('origin', 'O1')
                          .withColumnRenamed('dest', 'D1')
                          .withColumnRenamed('air_time_expected', 'air_time_expected')
                                       
              )
print('modified columns:', df_flights_1.columns)

df_flights_1.show()

Actual columns: ['origin', 'dest', 'air_time_expected']
modified columns: ['O1', 'D1', 'air_time_expected']
+---+---+-----------------+
| O1| D1|air_time_expected|
+---+---+-----------------+
|PDX|DEN|              123|
|PDX|ABQ|              136|
|PDX|JFK|              286|
|PDX|KOA|              357|
|PDX|SBA|              105|
|PDX|SEA|               34|
|PDX|SAN|              124|
|PDX|PHX|              130|
|PDX|EWR|              281|
|PDX|SFO|               85|
|PDX|BOS|              291|
|PDX|ONT|              111|
|PDX|DCA|              269|
|PDX|MDW|              214|
|PDX|LGB|              116|
|PDX|PHL|              283|
|PDX|ANC|              202|
|PDX|OAK|               81|
|PDX|STL|              204|
|PDX|IAD|              267|
+---+---+-----------------+
only showing top 20 rows



In [59]:
df_flights_Final = (df_flights.join(df_flights_1,
                  (df_flights.origin == df_flights_1.O1)
                     & (df_flights.dest == df_flights_1.D1)
                        ,"left")
)

In [60]:
df_flights_Final.toPandas()

Unnamed: 0,dep_time,arr_time,dep_delay,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime,air_time_projected,O1,D1,air_time_expected
0,658,935,-7,-5,VX,N846VA,1780,SEA,LAX,132.0,954,2014-12-08 06:58:00,115,SEA,LAX,126
1,1040,1505,5,5,AS,N559AS,851,SEA,HNL,360.0,2677,2014-01-22 10:40:00,287,SEA,HNL,343
2,1443,1652,-2,2,VX,N847VA,755,SEA,SFO,111.0,679,2014-03-09 14:43:00,87,SEA,SFO,101
3,1705,1839,45,34,WN,N360SW,344,PDX,SJC,83.0,569,2014-04-09 17:05:00,76,PDX,SJC,85
4,754,1015,-1,1,AS,N612AS,522,SEA,BUR,127.0,937,2014-03-09 07:54:00,113,SEA,BUR,122
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1806,2104,-4,-6,OO,N225AG,3458,SEA,SLC,89.0,689,2014-06-23 18:06:00,88,SEA,SLC,88
9996,2336,452,11,-13,AA,N3LEAA,1230,SEA,DFW,178.0,1660,2014-08-31 23:36:00,186,SEA,DFW,195
9997,904,1042,-1,-5,AS,N523AS,360,SEA,SMF,81.0,605,2014-08-08 09:04:00,80,SEA,SMF,82
9998,1441,1820,26,10,WN,N8647A,2857,SEA,ABQ,133.0,1180,2014-08-29 14:41:00,138,SEA,ABQ,142


In [61]:
df_flights = df_flights_Final.drop("O1", "D1")

In [62]:
df_flights.toPandas()

Unnamed: 0,dep_time,arr_time,dep_delay,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime,air_time_projected,air_time_expected
0,658,935,-7,-5,VX,N846VA,1780,SEA,LAX,132.0,954,2014-12-08 06:58:00,115,126
1,1040,1505,5,5,AS,N559AS,851,SEA,HNL,360.0,2677,2014-01-22 10:40:00,287,343
2,1443,1652,-2,2,VX,N847VA,755,SEA,SFO,111.0,679,2014-03-09 14:43:00,87,101
3,1705,1839,45,34,WN,N360SW,344,PDX,SJC,83.0,569,2014-04-09 17:05:00,76,85
4,754,1015,-1,1,AS,N612AS,522,SEA,BUR,127.0,937,2014-03-09 07:54:00,113,122
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1806,2104,-4,-6,OO,N225AG,3458,SEA,SLC,89.0,689,2014-06-23 18:06:00,88,88
9996,2336,452,11,-13,AA,N3LEAA,1230,SEA,DFW,178.0,1660,2014-08-31 23:36:00,186,195
9997,904,1042,-1,-5,AS,N523AS,360,SEA,SMF,81.0,605,2014-08-08 09:04:00,80,82
9998,1441,1820,26,10,WN,N8647A,2857,SEA,ABQ,133.0,1180,2014-08-29 14:41:00,138,142


In [63]:
# deleta os dataframes provisórios
#del df_flights

## Pergunta 10

In [64]:
df_flights.groupBy("air_time").count().distinct().orderBy("air_time", ascending=True).show()

+--------+-----+
|air_time|count|
+--------+-----+
|    null|   75|
|      20|    3|
|      24|    4|
|      25|   10|
|      26|   20|
|      27|   23|
|      28|   24|
|      29|   35|
|      30|   25|
|      31|   34|
|      32|   32|
|      33|   40|
|      34|   34|
|      35|   32|
|      36|   40|
|      37|   32|
|      38|   30|
|      39|   22|
|      40|   17|
|      41|   11|
+--------+-----+
only showing top 20 rows



In [65]:
# Função greatest (Retorna o maior valor de todos os parâmetros)
df_flights = (df_flights.withColumn('air_time',
                                  F.when(F.col('air_time').isNull(), 
                                         F.greatest(F.col('air_time_projected'), F.col('air_time_expected')))
                                   .otherwise(F.col('air_time'))                     
                               
            ))

In [66]:
df_flights.groupBy("air_time").count().distinct().orderBy("air_time", ascending=True).show()

+--------+-----+
|air_time|count|
+--------+-----+
|      20|    3|
|      24|    4|
|      25|   10|
|      26|   20|
|      27|   23|
|      28|   24|
|      29|   35|
|      30|   27|
|      31|   34|
|      32|   35|
|      33|   40|
|      34|   40|
|      35|   32|
|      36|   40|
|      37|   32|
|      38|   30|
|      39|   22|
|      40|   17|
|      41|   11|
|      42|   12|
+--------+-----+
only showing top 20 rows



## Pergunta 11

In [67]:
df_flights.filter(F.col('arr_time') == 'NA').toPandas()

Unnamed: 0,dep_time,arr_time,dep_delay,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime,air_time_projected,air_time_expected
0,0,,0,0,UA,,156,SEA,DEN,127,1024,2014-03-04 00:00:00,122,127
1,0,,0,0,AS,N527AS,2,SEA,DCA,268,2329,2014-02-12 00:00:00,252,268
2,0,,0,0,WN,N8323C,2485,SEA,MDW,214,1733,2014-07-01 00:00:00,193,214
3,0,,0,0,AS,N526AS,566,PDX,LAX,114,834,2014-04-30 00:00:00,103,114
4,0,,0,0,US,,553,SEA,PHL,279,2378,2014-01-03 00:00:00,257,279
5,0,,0,0,AS,N579AS,867,SEA,OGG,339,2640,2014-08-07 00:00:00,284,339
6,0,,0,0,OO,N689CA,4528,PDX,SEA,34,129,2014-08-11 00:00:00,32,34
7,2238,,8,0,OO,N297SW,5437,PDX,LMT,53,241,2014-02-06 22:38:00,44,53
8,0,,0,0,UA,,212,SEA,EWR,277,2402,2014-01-02 00:00:00,260,277
9,0,,0,0,OO,N917SW,6250,PDX,LAX,114,834,2014-05-15 00:00:00,103,114


In [68]:
# Transforma a coluna dep_datetime em segundos com a função unix_timestamp e soma com a coluna air_time 
# transformada em segundos, transformando novamente em timestamp depois
df_flights = df_flights.withColumn('sum', 
                        (F.unix_timestamp("dep_datetime") + 
                        (F.col('air_time') * 60).cast('int')).cast('timestamp'))

# Verifica aonde a coluna arr_time está nulo e troca o valor
df_flights = df_flights.withColumn('arr_time',
             F.when(F.col('arr_time') == 'NA', F.format_string('%d%d', F.hour(F.col('sum')), 
                                                                       F.minute(F.col('sum'))))
             .otherwise(F.col('arr_time')))

# Remove a coluna sum
df_flights = df_flights.drop('sum')

df_flights.toPandas()

Unnamed: 0,dep_time,arr_time,dep_delay,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,dep_datetime,air_time_projected,air_time_expected
0,658,935,-7,-5,VX,N846VA,1780,SEA,LAX,132,954,2014-12-08 06:58:00,115,126
1,1040,1505,5,5,AS,N559AS,851,SEA,HNL,360,2677,2014-01-22 10:40:00,287,343
2,1443,1652,-2,2,VX,N847VA,755,SEA,SFO,111,679,2014-03-09 14:43:00,87,101
3,1705,1839,45,34,WN,N360SW,344,PDX,SJC,83,569,2014-04-09 17:05:00,76,85
4,754,1015,-1,1,AS,N612AS,522,SEA,BUR,127,937,2014-03-09 07:54:00,113,122
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1806,2104,-4,-6,OO,N225AG,3458,SEA,SLC,89,689,2014-06-23 18:06:00,88,88
9996,2336,452,11,-13,AA,N3LEAA,1230,SEA,DFW,178,1660,2014-08-31 23:36:00,186,195
9997,904,1042,-1,-5,AS,N523AS,360,SEA,SMF,81,605,2014-08-08 09:04:00,80,82
9998,1441,1820,26,10,WN,N8647A,2857,SEA,ABQ,133,1180,2014-08-29 14:41:00,138,142


In [69]:
df_flights.select(F.col('origin'),
                  F.col('dest'),
                  F.col('arr_time'),
                  F.col('dep_time'),
                  F.col('air_time')).where((F.col('origin') == 'SEA') & (F.col('dest') == 'PHX')).show()

+------+----+--------+--------+--------+
|origin|dest|arr_time|dep_time|air_time|
+------+----+--------+--------+--------+
|   SEA| PHX|    1415|    1120|     154|
|   SEA| PHX|    1906|    1500|     151|
|   SEA| PHX|     757|     515|     143|
|   SEA| PHX|    1021|     746|     135|
|   SEA| PHX|    2027|    1750|     137|
|   SEA| PHX|    2104|    1844|     124|
|   SEA| PHX|    1020|     739|     139|
|   SEA| PHX|    1059|     716|     142|
|   SEA| PHX|    1355|    1113|     140|
|   SEA| PHX|    1205|     837|     129|
|   SEA| PHX|    1407|    1122|     144|
|   SEA| PHX|    1033|     739|     153|
|   SEA| PHX|    1723|    1439|     143|
|   SEA| PHX|     939|     654|     142|
|   SEA| PHX|    1402|    1126|     140|
|   SEA| PHX|    2153|    1921|     140|
|   SEA| PHX|    1922|    1648|     140|
|   SEA| PHX|    1050|     726|     126|
|   SEA| PHX|    1450|    1211|     144|
|   SEA| PHX|    2122|    1738|     144|
+------+----+--------+--------+--------+
only showing top

In [70]:
df_flights.filter(F.col('arr_time') == 'NA').show(truncate = 5)

(df_flights
.select(F.col('dep_time'), 
        F.col('dep_delay'), 
        F.col('arr_time'), 
        F.col('arr_delay'), 
        F.col('air_time'), 
        F.col('tailnum'))
.where(F.col('tailnum') == 'N612AS')
.show(50))

+--------+--------+---------+---------+-------+-------+------+------+----+--------+--------+------------+------------------+-----------------+
|dep_time|arr_time|dep_delay|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|dep_datetime|air_time_projected|air_time_expected|
+--------+--------+---------+---------+-------+-------+------+------+----+--------+--------+------------+------------------+-----------------+
+--------+--------+---------+---------+-------+-------+------+------+----+--------+--------+------------+------------------+-----------------+

+--------+---------+--------+---------+--------+-------+
|dep_time|dep_delay|arr_time|arr_delay|air_time|tailnum|
+--------+---------+--------+---------+--------+-------+
|     754|       -1|    1015|        1|     127| N612AS|
|     653|       -7|     920|       -6|     133| N612AS|
|     936|        1|    1210|       -4|     140| N612AS|
|    1829|       -6|    2101|      -21|     122| N612AS|
|    2124|       14|    2356

## Pergunta 12

In [71]:
df_flights = df_flights.withColumn('haul_duration', (
                      F.when((F.col('air_time').between(20,180)), "SHORT-HAUL")
                       .when((F.col('air_time').between(181,360)), "MEDIUM-HAUL")
                       .otherwise('LONG-HAUL')))

In [72]:
df_flights.groupBy("haul_duration").count().distinct().orderBy("haul_duration", ascending=True).show()

+-------------+-----+
|haul_duration|count|
+-------------+-----+
|    LONG-HAUL|   54|
|  MEDIUM-HAUL| 3254|
|   SHORT-HAUL| 6692|
+-------------+-----+



## Pergunta 13

In [73]:
df_flights = df_flights.withColumn('dep_season', (
                      F.when((F.col('dep_datetime').between('2013-12-21 21:48:00','2014-03-20 15:32:59')), "WINTER")
                       .when((F.col('dep_datetime').between('2014-03-20 15:33:00','2014-06-21 10:13:59')), "SPRING")
                       .when((F.col('dep_datetime').between('2014-06-21 10:14:00','2014-09-23 02:03:59')), "SUMMER")
                       .when((F.col('dep_datetime').between('2014-09-23 02:04:00','2014-12-21 21:47:59')), "FALL")
                       .when((F.col('dep_datetime').between('2014-12-21 21:48:00','2015-03-20 15:32:59')), "WINTER")))

In [74]:
df_flights.groupBy("dep_season").count().distinct().orderBy("dep_season", ascending=True).show()

+----------+-----+
|dep_season|count|
+----------+-----+
|      FALL| 2373|
|    SPRING| 2560|
|    SUMMER| 2918|
|    WINTER| 2149|
+----------+-----+



## Pergunta 14

In [75]:
df_flights = df_flights.withColumn('dep_delay_category', (
                      F.when((F.col('dep_delay') < 0), "ANTECIPATED")
                       .when((F.col('dep_delay') == 0), "INTIME")
                       .when((F.col('dep_delay') >= 60 ), "MAJOR")
                       .otherwise("MINOR")))

In [76]:
df_flights.groupBy("dep_delay_category").count().distinct().orderBy("dep_delay_category", ascending=True).show()

+------------------+-----+
|dep_delay_category|count|
+------------------+-----+
|       ANTECIPATED| 5894|
|            INTIME|  646|
|             MAJOR|  395|
|             MINOR| 3065|
+------------------+-----+



## Salvando arquivo

In [77]:
(df_flights
            .repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save("C:/projetos/spark/transformation/flights_proc.parquet"))