In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

In [3]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("Aceleração PySpark - Capgemini"))

## -- Import dos arquivos de Qualidade e Transformação

####  Import dos arquivos de Qualidade

In [4]:
airports_qa = (spark.getOrCreate().read
                    .format("parquet")
                    .option("header", "true")
                    .load("./datasets_qa/airports_qa.parquet"))

planes_qa = (spark.getOrCreate().read
                  .format("parquet")
                  .option("header", "true")
                  .load("./datasets_qa/planes_qa.parquet"))

flights_qa = (spark.getOrCreate().read
                   .format("parquet")
                   .option("header", "true")
                   .load("./datasets_qa/flights_qa.parquet"))

#### Import dos arquivos de Transformação

In [5]:

airports_trf = (spark.getOrCreate().read
                     .format("parquet")
                     .option("header", "true")
                     .load("./datasets_trf/airports_proc.parquet"))

planes_trf = (spark.getOrCreate().read
                   .format("parquet")
                   .option("header", "true")
                   .load("./datasets_trf/planes_proc.parquet"))

flights_trf = (spark.getOrCreate().read
                    .format("parquet")
                    .option("header", "true")
                    .load("./datasets_trf/flights_proc.parquet"))

## -- Perguntas para qualidade

#### Pergunta 1

In [6]:
# Definição dos campos de cada DF;
flights_qa = (flights_qa.withColumnRenamed("tailnum", "tailnum_flights")
                        .withColumnRenamed("qa_tailnum", "qa_tailnum_flights"))

planes_qa = (planes_qa.withColumnRenamed("tailnum", "tailnum_planes")
                        .withColumnRenamed("qa_tailnum", "qa_tailnum_planes"))

# Join dos DF's;
qa_df_joins = (flights_qa.join(planes_qa, 
                                planes_qa.tailnum_planes == flights_qa.tailnum_flights, 'left')
                          .join(airports_qa, 
                                (airports_qa.faa == flights_qa.origin) & 
                                (airports_qa.faa == flights_qa.dest), 'left'))

In [7]:
# Checagem de linha dos joins;
print(qa_df_joins.count())

10000


#### Pergunta 2

In [8]:
for column in qa_df_joins.columns:
    if 'qa_' in column:
        print(column.replace('_', ' ').upper())
        final_df = qa_df_joins.withColumn(column, (F.when((F.col(column).startswith("M")) ,"M")
                                                     .when((F.col(column).startswith("F")) ,"F")
                                                     .when((F.col(column).startswith("I")) ,"I")
                                                     .when((F.col(column).startswith("S")) ,"S")
                                                     .when((F.col(column).startswith("T")) ,"T")
                                                     .otherwise(F.col(column))))
        
        final_df.groupBy(column).agg(F.count(column)).show()

QA YEAR MONTH DAY
+-----------------+------------------------+
|qa_year_month_day|count(qa_year_month_day)|
+-----------------+------------------------+
|             null|                       0|
+-----------------+------------------------+

QA HOUR MINUTE
+--------------+---------------------+
|qa_hour_minute|count(qa_hour_minute)|
+--------------+---------------------+
|          null|                    0|
|             M|                   48|
|             I|                    1|
+--------------+---------------------+

QA DEP ARR TIME
+---------------+----------------------+
|qa_dep_arr_time|count(qa_dep_arr_time)|
+---------------+----------------------+
|              F|                   241|
|           null|                     0|
|              M|                    55|
+---------------+----------------------+

QA DEP ARR DELAY
+----------------+-----------------------+
|qa_dep_arr_delay|count(qa_dep_arr_delay)|
+----------------+-----------------------+
|            null

#### Pergunta 3

In [9]:
# QA SPEED
# M 9443

#### Pergunta 4

In [10]:
# QA FLIGHT
# F 6158

#### Pergunta 5

In [11]:
# QA_YEAR
# I 8

## -- Perguntas para negócio

#### Pergunta 1

In [12]:
planes_trf = planes_trf.withColumnRenamed("tailnum","tailnum_plane")

df = flights_trf.join(planes_trf, flights_trf.tailnum == planes_trf.tailnum_plane, "left")

df = df.join(airports_trf, flights_trf.origin == airports_trf.faa, "left")

airports_trf_dest = (airports_trf.withColumnRenamed("faa", "faa_dest")
                                 .withColumnRenamed("region", "region_dest")
                                 .withColumnRenamed("alt", "alt_dest")
                                 .withColumnRenamed("name", "name_dest")
                                 .withColumnRenamed("type", "type_dest"))

trf_df_joins = df.join(airports_trf_dest, flights_trf.dest == airports_trf_dest.faa_dest, "left")

In [13]:
trf_df_joins.createOrReplaceTempView('df_all_joins')

#### Pergunta 2

In [14]:
# Em cada region origin
print('REGIÕES DE ORIGEM')
spark.getOrCreate().sql("""
    select 
        region as region,
        count(distinct name) as airports
    from 
        df_all_joins 
    group by 
        region""").show(4)

print('DESTINO')
# Em cada region dest
spark.getOrCreate().sql("""
    select 
        region_dest as region,
        count(distinct name) as airports
    from 
        df_all_joins 
    group by 
        region_dest""").show(4)

REGIÕES DE ORIGEM
+-------------+--------+
|       region|airports|
+-------------+--------+
|MAINLAND-WEST|       2|
+-------------+--------+

DESTINO
+-------------+--------+
|       region|airports|
+-------------+--------+
|       ALASKA|       2|
|MAINLAND-EAST|       2|
|MAINLAND-WEST|       2|
+-------------+--------+



#### Pergunta 3

In [15]:
spark.getOrCreate().sql("""
    select
        (max(alt_dest) - max(alt)) as max_alt_diff
    from 
        df_all_joins
    """).show(4)

+------------+
|max_alt_diff|
+------------+
|        6169|
+------------+



#### Pergunta 4

In [16]:
spark.getOrCreate().sql("""
    select
         ceil(avg(arr_delay)) as delay_origin,
         ceil(avg(dep_delay)) as delay_dest
    from 
        df_all_joins
    where 
        arr_delay > 0 and 
        dep_delay > 0
    """).show(4)

+------------+----------+
|delay_origin|delay_dest|
+------------+----------+
|          34|        35|
+------------+----------+



#### Pergunta 5

In [17]:
print('REGIÕES DE ORIGEM')
spark.getOrCreate().sql("""
    select
        region as region,
        round(avg(arr_delay), 2) as delay_avg
    from 
        df_all_joins
    where 
        arr_delay > 0
    group by 
        region""").show(4)

print('DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        round(avg(arr_delay), 2) as delay_avg
    from 
        df_all_joins
    where 
        arr_delay > 0
    group by 
        region_dest""").show(4)


REGIÕES DE ORIGEM
+-------------+---------+
|       region|delay_avg|
+-------------+---------+
|MAINLAND-WEST|    24.65|
+-------------+---------+

DESTINO
+-------------+---------+
|       region|delay_avg|
+-------------+---------+
|       ALASKA|    22.21|
|MAINLAND-EAST|    28.53|
|MAINLAND-WEST|    23.79|
+-------------+---------+



#### Pergunta 6

In [18]:
spark.getOrCreate().sql("""
    select
        year(dep_datetime) as year,
        sum(arr_delay) as arr_delay,
        sum(dep_delay) as dep_delay
    from 
        df_all_joins
    group by 
        year(dep_datetime)""").show(4)

+----+---------+---------+
|year|arr_delay|dep_delay|
+----+---------+---------+
|2014|    22362|    60395|
+----+---------+---------+



#### Pergunta 7

In [19]:
print('SOMENTE ORIGEM')
spark.getOrCreate().sql("""
    select
        region as region,
        year(dep_datetime) as year,
        sum(arr_delay) as accumulated_delay
    from 
        df_all_joins
    group by
        region,
        year(dep_datetime)""").show(4)

print('SOMENTE DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        year(dep_datetime) as year,
        sum(dep_delay) as accumulated_delay
    from 
        df_all_joins
    group by
        region_dest,
        year(dep_datetime)""").show(4)

print('ORIGEM - DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        year(dep_datetime) as year,
        (sum(arr_delay) + sum(dep_delay)) as accumulated_delay
    from 
        df_all_joins
    group by
        region_dest,
        year(dep_datetime)""").show(4)

SOMENTE ORIGEM
+-------------+----+-----------------+
|       region|year|accumulated_delay|
+-------------+----+-----------------+
|MAINLAND-WEST|2014|            22362|
+-------------+----+-----------------+

SOMENTE DESTINO
+-------------+----+-----------------+
|       region|year|accumulated_delay|
+-------------+----+-----------------+
|MAINLAND-WEST|2014|            38825|
|       ALASKA|2014|             4891|
|MAINLAND-EAST|2014|            16679|
+-------------+----+-----------------+

ORIGEM - DESTINO
+-------------+----+-----------------+
|       region|year|accumulated_delay|
+-------------+----+-----------------+
|MAINLAND-WEST|2014|            56704|
|       ALASKA|2014|             4435|
|MAINLAND-EAST|2014|            21618|
+-------------+----+-----------------+



#### Pergunta 8

In [20]:
spark.getOrCreate().sql("""
    select
        ceil(avg(air_time)) as air_time_avg
    from 
        df_all_joins""").show(4)

+------------+
|air_time_avg|
+------------+
|         153|
+------------+



#### Pergunta 9

In [21]:
print('REGIÕES DE ORIGEM')
spark.getOrCreate().sql("""
    select
        region as region,
        ceil(avg(air_time)) as air_time_avg
    from 
        df_all_joins
    group by 
        region""").show(4)

print('DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        ceil(avg(air_time)) as air_time_avg
    from 
        df_all_joins
    group by 
        region_dest""").show(4)

REGIÕES DE ORIGEM
+-------------+------------+
|       region|air_time_avg|
+-------------+------------+
|MAINLAND-WEST|         153|
+-------------+------------+

DESTINO
+-------------+------------+
|       region|air_time_avg|
+-------------+------------+
|       ALASKA|         228|
|MAINLAND-EAST|         238|
|MAINLAND-WEST|         116|
+-------------+------------+



#### Pergunta 10

In [22]:
spark.getOrCreate().sql("""
    select
        origin,
        dest,
        ceil(avg(air_time)) as air_time_avg
    from 
        df_all_joins
    group by 
        origin,
        dest""").show(4)

+------+----+------------+
|origin|dest|air_time_avg|
+------+----+------------+
|   SEA| RNO|          75|
|   SEA| DTW|         220|
|   SEA| CLE|         234|
|   SEA| LAX|         127|
+------+----+------------+
only showing top 4 rows



#### Pergunta 11

In [23]:
spark.getOrCreate().sql("""
    select
        year(dep_datetime) as year,
        ceil(sum(air_time)) as air_time_accumulated
    from 
        df_all_joins
    group by 
        year(dep_datetime)""").show(4)

+----+--------------------+
|year|air_time_accumulated|
+----+--------------------+
|2014|             1528625|
+----+--------------------+



#### Pergunta 12

In [24]:
print('ORIGEM')
spark.getOrCreate().sql("""
    select
        region as region,
        ceil(sum(air_time)) as air_time_accumulated
    from 
        df_all_joins
    group by 
        region""").show(4)

print('DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        ceil(sum(air_time)) as air_time_accumulated
    from 
        df_all_joins
    group by 
        region_dest""").show(4)

ORIGEM
+-------------+--------------------+
|       region|air_time_accumulated|
+-------------+--------------------+
|MAINLAND-WEST|             1528625|
+-------------+--------------------+

DESTINO
+-------------+--------------------+
|       region|air_time_accumulated|
+-------------+--------------------+
|       ALASKA|              230602|
|MAINLAND-EAST|              508344|
|MAINLAND-WEST|              789679|
+-------------+--------------------+



#### Pergunta 13

In [25]:
spark.getOrCreate().sql("""
    select
        ceil(avg(distance)) as distance_avg
    from 
        df_all_joins
    """).show(4)

+------------+
|distance_avg|
+------------+
|        1209|
+------------+



#### Pergunta 14

In [26]:
print('REGIÕES DE ORIGEM')
spark.getOrCreate().sql("""
    select
        region as region,
        ceil(avg(distance)) as distance_avg 
    from 
        df_all_joins
    group by 
        region
    """).show(4)

print('REGIÕES DE DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        ceil(avg(distance)) as distance_avg 
    from 
        df_all_joins
    group by 
        region_dest
    """).show(4)

REGIÕES DE ORIGEM
+-------------+------------+
|       region|distance_avg|
+-------------+------------+
|MAINLAND-WEST|        1209|
+-------------+------------+

REGIÕES DE DESTINO
+-------------+------------+
|       region|distance_avg|
+-------------+------------+
|       ALASKA|        1742|
|MAINLAND-EAST|        2043|
|MAINLAND-WEST|         868|
+-------------+------------+



#### Pergunta 15

In [27]:
spark.getOrCreate().sql("""
    select
        origin,
        dest,
        ceil(avg(distance)) as distance_avg 
    from 
        df_all_joins
    group by 
        origin,
        dest""").show(4)

+------+----+------------+
|origin|dest|distance_avg|
+------+----+------------+
|   SEA| RNO|         564|
|   SEA| DTW|        1927|
|   SEA| CLE|        2021|
|   SEA| LAX|         954|
+------+----+------------+
only showing top 4 rows



#### Pergunta 16

In [28]:
spark.getOrCreate().sql("""
    select
        year(dep_datetime) as year,
        ceil(sum(distance)) as distance_accumulated
    from 
        df_all_joins
    group by 
        year(dep_datetime)""").show(4)

+----+--------------------+
|year|distance_accumulated|
+----+--------------------+
|2014|            12081516|
+----+--------------------+



#### Pergunta 17

In [29]:
print('REGIÕES DE ORIGEM')
spark.getOrCreate().sql("""
    select
        region as region,
        ceil(sum(distance)) as distance_accumulated 
    from 
        df_all_joins
    group by 
        region
    """).show(4)

print('REGIÕES DE DESTINO')
spark.getOrCreate().sql("""
    select
        region_dest as region,
        ceil(sum(distance)) as distance_accumulated 
    from 
        df_all_joins
    group by 
        region_dest
    """).show(4)

REGIÕES DE ORIGEM
+-------------+--------------------+
|       region|distance_accumulated|
+-------------+--------------------+
|MAINLAND-WEST|            12081516|
+-------------+--------------------+

REGIÕES DE DESTINO
+-------------+--------------------+
|       region|distance_accumulated|
+-------------+--------------------+
|       ALASKA|             1762553|
|MAINLAND-EAST|             4378902|
|MAINLAND-WEST|             5940061|
+-------------+--------------------+



#### Pergunta 18

In [30]:
spark.getOrCreate().sql("""
    select
        origin,
        dest,
        ceil(avg(seats)) as passengers_avg
    from 
        df_all_joins
    group by
        origin,
        dest""").show(4)

+------+----+--------------+
|origin|dest|passengers_avg|
+------+----+--------------+
|   SEA| RNO|           142|
|   SEA| DTW|           213|
|   SEA| CLE|           182|
|   SEA| LAX|           155|
+------+----+--------------+
only showing top 4 rows



#### Pergunta 19

In [31]:
spark.getOrCreate().sql("""
    select
        year(dep_datetime) as year,
        ceil(sum(seats)) as seats_accumulated
    from
        df_all_joins
    group by 
        year(dep_datetime)""").show(4)

+----+-----------------+
|year|seats_accumulated|
+----+-----------------+
|2014|          1509544|
+----+-----------------+



#### Pergunta 20

In [32]:
spark.getOrCreate().sql("""
    select
        dest,
        count(dest) as most_popular_dest
    from 
        df_all_joins
    group by 
        dest
    order by 
        count(dest) desc""").show(4)

+----+-----------------+
|dest|most_popular_dest|
+----+-----------------+
| SFO|              787|
| LAX|              615|
| DEN|              586|
| PHX|              530|
+----+-----------------+
only showing top 4 rows



#### Pergunta 21

In [33]:
spark.getOrCreate().sql("""
    select
        dest,
        sum(seats) as dest_most_passengers
    from 
        df_all_joins
    group by 
        dest
    order by 
        sum(seats) desc""").show(4)

+----+--------------------+
|dest|dest_most_passengers|
+----+--------------------+
| SFO|              119635|
| PHX|               96317|
| LAX|               91406|
| DEN|               88218|
+----+--------------------+
only showing top 4 rows



#### Pergunta 22

In [34]:
spark.getOrCreate().sql("""
    select
        distinct origin,
        dest,
        distance
    from 
        df_all_joins
    where 
        origin == 'PDX' or 
        dest == 'PDX'
    order by 
        distance desc""").show(4)

+------+----+--------+
|origin|dest|distance|
+------+----+--------+
|   PDX| LIH|    2631|
|   PDX| KOA|    2607|
|   PDX| HNL|    2603|
|   PDX| OGG|    2562|
+------+----+--------+
only showing top 4 rows



#### Pergunta 23

In [54]:
# Essa parece estar errada a respota;
# Parece que a resposta seria trazer em cada mês o destino que teve mais voos;
spark.getOrCreate().sql("""
    select
        month(dep_datetime) as month,
        dest,
        count(flight) as flights_qtty
    from 
        df_all_joins
    group by
        month(dep_datetime),
        dest
    order by 
        count(flight) desc""").show(4)

+-----+----+------------+
|month|dest|flights_qtty|
+-----+----+------------+
|    5| LAX|          77|
|   12| SFO|          76|
|    8| SFO|          75|
|    5| SFO|          73|
+-----+----+------------+
only showing top 4 rows



#### Pergunta 24

In [36]:
spark.getOrCreate().sql("""
    select
        model,
        count(model) as models_qtty
    from 
        df_all_joins
    group by 
        model
    order by 
        count(model) desc""").show(4)

+---------+-----------+
|    model|models_qtty|
+---------+-----------+
|  737-890|       1463|
|  737-7H4|        851|
|737-990ER|        664|
| A320-232|        612|
+---------+-----------+
only showing top 4 rows



#### Pergunta 25

In [37]:
spark.getOrCreate().sql("""
    select
        model,
        dest,
        count(model) models_qtty
    from 
        df_all_joins
    group by 
        dest, 
        model
    order by 
        count(model) desc""").show(4)

+-------+----+-----------+
|  model|dest|models_qtty|
+-------+----+-----------+
|737-7H4| OAK|        141|
|737-890| ANC|        138|
|737-790| SNA|        122|
|737-7H4| SMF|        114|
+-------+----+-----------+
only showing top 4 rows



#### Pergunta 26

In [38]:
spark.getOrCreate().sql("""
    select
        haul_duration,
        ceil(avg(engines)) as engines_avg
    from 
        df_all_joins
    group by 
        haul_duration""").show(4)

+-------------+-----------+
|haul_duration|engines_avg|
+-------------+-----------+
|    LONG-HAUL|          2|
|  MEDIUM-HAUL|          2|
|   SHORT-HAUL|          2|
+-------------+-----------+



#### Pergunta 27

In [39]:
spark.getOrCreate().sql("""
    select
        dep_season,
        count(dep_season) as dep_season
    from 
        df_all_joins
    group by
        dep_season
    order by
        count(dep_season) desc""").show(4)

+----------+----------+
|dep_season|dep_season|
+----------+----------+
|    SUMMER|      2918|
|    SPRING|      2560|
|      FALL|      2373|
|    WINTER|      2149|
+----------+----------+



#### Pergunta 28

In [40]:
spark.getOrCreate().sql("""
    select
        dep_season,
        dest,
        count(dep_season) as dep_season
    from 
        df_all_joins
    group by
        dep_season,
        dest
    order by
        count(dep_season) desc""").show(4)

+----------+----+----------+
|dep_season|dest|dep_season|
+----------+----+----------+
|    SUMMER| SFO|       217|
|    SPRING| SFO|       199|
|      FALL| SFO|       198|
|    SPRING| LAX|       176|
+----------+----+----------+
only showing top 4 rows



#### Pergunta 29

In [41]:
spark.getOrCreate().sql("""
    select
        dep_delay_category,
        count(dep_delay_category) category_qtty
    from 
        df_all_joins
    group by
        dep_delay_category
    order by
        count(dep_delay_category) desc""").show(4)

+------------------+-------------+
|dep_delay_category|category_qtty|
+------------------+-------------+
|       ANTECIPATED|         5894|
|             MINOR|         3065|
|            INTIME|          646|
|             MAJOR|          395|
+------------------+-------------+



#### Pergunta 30

In [42]:
spark.getOrCreate().sql("""
    select
        origin,
        dest,
        dep_delay_category,
        count(dep_delay_category) category_qtty
    from 
        df_all_joins
    group by
        origin,
        dest,
        dep_delay_category
    order by
        count(dep_delay_category) desc""").show(4)

+------+----+------------------+-------------+
|origin|dest|dep_delay_category|category_qtty|
+------+----+------------------+-------------+
|   SEA| LAX|       ANTECIPATED|          293|
|   SEA| SFO|       ANTECIPATED|          245|
|   SEA| LAS|       ANTECIPATED|          228|
|   SEA| PHX|       ANTECIPATED|          195|
+------+----+------------------+-------------+
only showing top 4 rows

