In [1]:
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from datetime import datetime

In [4]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master('local[2]')
                     .appName('Qualidade beta')
        )

In [5]:
schema_airports = StructType([
    StructField('faa',  StringType(),  True),
    StructField('name', StringType(),  True),
    StructField('lat',  FloatType(),   True),
    StructField('lon',  FloatType(),   True),
    StructField('alt',  IntegerType(), True),
    StructField('tz',   IntegerType(), True),
    StructField('dst',  StringType(),  True)
])

schema_planes = StructType([
    StructField('tailnum',      StringType(),  True),
    StructField('year',         IntegerType(), True),
    StructField('type',         StringType(),  True),
    StructField('manufacturer', StringType(),  True),
    StructField('model',        StringType(),  True),
    StructField('engines',      IntegerType(), True),
    StructField('seats',        IntegerType(), True),
    StructField('speed',        IntegerType(), True),
    StructField('engine',       StringType(),  True)
])

schema_flights = StructType([
    StructField('year',      IntegerType(), True),
    StructField('month',     IntegerType(), True),
    StructField('day',       IntegerType(), True),
    StructField('dep_time',  StringType(),  True),
    StructField('dep_delay', IntegerType(), True),
    StructField('arr_time',  StringType(),  True),
    StructField('arr_delay', IntegerType(), True),
    StructField('carrier',   StringType(),  True),
    StructField('tailnum',   StringType(),  True),
    StructField('flight',    StringType(),  True),
    StructField('origin',    StringType(),  True),
    StructField('dest',      StringType(),  True),
    StructField('air_time',  IntegerType(), True),
    StructField('distance',  IntegerType(), True),
    StructField('hour',      IntegerType(), True),
    StructField('minute',    IntegerType(), True),
])

In [6]:
df_airports = (spark.getOrCreate().read
                  .format('csv')
                  .option('header', 'true')
                  .schema(schema_airports)
                  .load('airports.csv'))

df_planes = (spark.getOrCreate().read
                  .format('csv')
                  .option('header', 'true')
                  .schema(schema_planes)
                  .load('planes.csv'))

df_flights = (spark.getOrCreate().read
                  .format('csv')
                  .option('header', 'true')
                  .schema(schema_flights)
                  .load('flights.csv'))

# AIRPORTS DATASET

In [7]:
df_airports.show()

+---+--------------------+---------+-----------+----+---+---+
|faa|                name|      lat|        lon| alt| tz|dst|
+---+--------------------+---------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  U|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|
|0W3|Harford County Ai...|39.566837|   -76.2024| 409| -5|  A|
|10C|  Galt Field Airport| 42.40289| -88.375114| 875| -6|  U|
|17G|Port Bucyrus-Craw...|40.781555|  -82.97481|1003| -5|  A|
|19A|Jac

# Pergunta 1

In [8]:
airports_qa = df_airports.withColumn('qa_faa', F.when(F.col('faa').isNull(), 'M')
                                                .when(F.length(F.col('faa')).between(3,5), 'F')                                               
                                    )

In [9]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_faa), qa_faa from airports_qa group by 2").show()

+-------------+------+
|count(qa_faa)|qa_faa|
+-------------+------+
|         1397|     F|
+-------------+------+



# Pergunta 2

In [10]:
airports_qa = airports_qa.withColumn('qa_name', F.when(F.col('name').isNull(), 'M'))

In [11]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_name), qa_name from airports_qa group by 2").show()

+--------------+-------+
|count(qa_name)|qa_name|
+--------------+-------+
|             0|   null|
+--------------+-------+



# Pergunta 3

In [12]:
airports_qa = airports_qa.withColumn('qa_lat', F.when(F.col('lat').isNull(), 'M')
                                                .when(F.col('lat').between(-180, 180), 'I')
                                                .when(F.col('lat').rlike('r[0-9a-zA-Z]+'), 'A')
                                    )

In [13]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_lat), qa_lat from airports_qa group by 2").show()

+-------------+------+
|count(qa_lat)|qa_lat|
+-------------+------+
|         1397|     I|
+-------------+------+



# Pergunta 4

In [14]:
airports_qa = airports_qa.withColumn('qa_lon', F.when(F.col('lon').isNull(), 'M')
                                                .when(F.col('lon').between(-180, 180), 'I')
                                                .when(F.col('lon').rlike('r[0-9a-zA-Z]+'), 'A')
                                    )

In [15]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_lon), qa_lon from airports_qa group by 2").show()

airports_qa.show(5)

+-------------+------+
|count(qa_lon)|qa_lon|
+-------------+------+
|         1397|     I|
+-------------+------+

+---+--------------------+---------+---------+----+---+---+------+-------+------+------+
|faa|                name|      lat|      lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|
+---+--------------------+---------+---------+----+---+---+------+-------+------+------+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|     F|   null|     I|     I|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|     F|   null|     I|     I|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|     F|   null|     I|     I|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|     F|   null|     I|     I|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -4|  A|     F|   null|     I|     I|
+---+--------------------+---------+---------+----+---+---+------+-------+------+------+
only showing top 5 rows



# Pergunta 5

In [16]:
airports_qa = airports_qa.withColumn('qa_alt', F.when(F.col('alt').isNull(), 'M')
                                                .when(F.col('alt')<0, 'I')
                                                .when(F.col('alt').rlike('r[0-9a-zA-Z]+'), 'A')
                                    )

In [17]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_alt), qa_alt from airports_qa group by 2").show()

airports_qa.show(5)

+-------------+------+
|count(qa_alt)|qa_alt|
+-------------+------+
|            0|  null|
|            2|     I|
+-------------+------+

+---+--------------------+---------+---------+----+---+---+------+-------+------+------+------+
|faa|                name|      lat|      lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|
+---+--------------------+---------+---------+----+---+---+------+-------+------+------+------+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|     F|   null|     I|     I|  null|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|     F|   null|     I|     I|  null|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|     F|   null|     I|     I|  null|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|     F|   null|     I|     I|  null|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -4|  A|     F|   null|     I|     I|  null|
+---+--------------------+---------+---------+----+---+---+------+-------+------+------+-----

#  Pergunta 6

In [18]:
airports_qa = airports_qa.withColumn('qa_tz', F.when(F.col('tz').isNull(), 'M')
                                               .when(~F.col('tz').between(-11,14), 'I')
                                               .when(F.col('tz').rlike('r[0-9a-zA-Z]+'), 'A')
                                    )

In [19]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_tz), qa_tz from airports_qa group by 2").show()

airports_qa.show(5)

+------------+-----+
|count(qa_tz)|qa_tz|
+------------+-----+
|           0| null|
+------------+-----+

+---+--------------------+---------+---------+----+---+---+------+-------+------+------+------+-----+
|faa|                name|      lat|      lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|qa_tz|
+---+--------------------+---------+---------+----+---+---+------+-------+------+------+------+-----+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|     F|   null|     I|     I|  null| null|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|     F|   null|     I|     I|  null| null|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|     F|   null|     I|     I|  null| null|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|     F|   null|     I|     I|  null| null|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -4|  A|     F|   null|     I|     I|  null| null|
+---+--------------------+---------+---------+----+---+---+------+-------+----

# Pergunta 7

In [20]:
airports_qa = airports_qa.withColumn('qa_dst', F.when(F.col('dst').isNull(), 'M')
                                                .when(~F.col('dst').isin('E', 'A', 'S', 'O', 'Z', 'N', 'U'), 'C')
                                                .when(F.col('dst').rlike('r[0-9]+'), 'N')   
                                    )

In [21]:
airports_qa.createOrReplaceTempView('airports_qa')

spark.getOrCreate().sql("select count(qa_dst), qa_dst from airports_qa group by 2").show()

airports_qa.show(5)

+-------------+------+
|count(qa_dst)|qa_dst|
+-------------+------+
|            0|  null|
+-------------+------+

+---+--------------------+---------+---------+----+---+---+------+-------+------+------+------+-----+------+
|faa|                name|      lat|      lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|qa_tz|qa_dst|
+---+--------------------+---------+---------+----+---+---+------+-------+------+------+------+-----+------+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|     F|   null|     I|     I|  null| null|  null|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|     F|   null|     I|     I|  null| null|  null|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|     F|   null|     I|     I|  null| null|  null|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|     F|   null|     I|     I|  null| null|  null|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -4|  A|     F|   null|     I|     I|  null| null|  null|
+---+-------

# PLANES DATASET

# Pergunta 1

In [22]:
planes_qa = df_planes.withColumn('qa_tailnum', F.when(F.col('tailnum').isNull(), 'M')
                                                .when(F.length(F.col('tailnum'))!=5 , 'S')  
                                                .when(F.col('tailnum').rlike('^N([1-9])\d{3}[^IO]'), 'F')
                                                .when(F.col('tailnum').rlike('^N([1-9])\d{2}[^IO][^IO]'), 'F')
                                                .when(~F.col('tailnum').rlike('^N.*'), 'FN')
                                                .when(F.col('tailnum').rlike('^[IO0]') == True, 'FE')
                                 )

In [23]:
planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_tailnum), qa_tailnum from planes_qa group by 2").show()

planes_qa.show(5)

+-----------------+----------+
|count(qa_tailnum)|qa_tailnum|
+-----------------+----------+
|                0|      null|
|             2609|         S|
+-----------------+----------+

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan| 

# Pergunta 2

In [24]:
planes_qa = planes_qa.withColumn('qa_year', F.when(F.col('year') == 'NA' , 'M')
                                             .when(F.col('year') <= 1950 , 'I')          
                                )

In [25]:
planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_year), qa_year from planes_qa group by 2").show()

planes_qa.show(5)

+--------------+-------+
|count(qa_year)|qa_year|
+--------------+-------+
|             0|   null|
|             1|      I|
+--------------+-------+

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|qa_year|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  1

# Pergunta 3

In [26]:
categorias = ['Fixed wing multi engine','Fixed wing single engine','Rotorcraft']

planes_qa = planes_qa.withColumn('qa_type', F.when(F.col('type').isNull(), 'M')
                                             .when(~F.col('type').isin(categorias), 'C')                                
                                )

In [27]:
planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_type), qa_type from planes_qa group by 2").show()

planes_qa.show(5)

+--------------+-------+
|count(qa_type)|qa_type|
+--------------+-------+
|             0|   null|
+--------------+-------+

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|qa_year|qa_type|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|   null|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|   null|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|   null|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   null|   null|
| N107US|1999|Fixed wing multi ...|AIRBUS

# Pergunta 4

In [28]:
Manufacturer = ['AIRBUS','BOEING','BOMBARDIER','CESSNA','EMBRAER','SIKORSKY', 'CANADAIR', 'PIPER', 'MCDONNELL DOUGLAS', 'CIRRUS', 'BELL', 'KILDALL GARY','LAMBERT RICHARD', 'BARKER JACK', 'ROBINSON HELICOPTER', 'GULFSTREAM', 'MARZ BARRY']

planes_qa = planes_qa.withColumn('qa_manufacturer', F.when(F.col('manufacturer').isNull(), 'M')
                                                     .when(~F.col('manufacturer').isin(Manufacturer), 'C')
                                )


In [29]:
planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_manufacturer), qa_manufacturer from planes_qa group by 2").show()


+----------------------+---------------+
|count(qa_manufacturer)|qa_manufacturer|
+----------------------+---------------+
|                     0|           null|
|                   621|              C|
+----------------------+---------------+



# Pergunta 5

In [30]:
planes_qa = planes_qa.withColumn('qa_model', F.when(F.col('model').isNull(), 'M')
                                              .when((F.col('manufacturer') == 'BOEING') & (F.col('model').rlike('^7')==False), 'F') 
                                              .when((F.col('manufacturer') == 'AIRBUS') & (F.col('model').rlike('^A')==False), 'F')
                                              .when((F.col('manufacturer').isin('BOMBARDIER', 'CANADAIR')) & (F.col('model').rlike('^CL')==False), 'F')
                                              .when((F.col('manufacturer') == 'MCDONNELL DOUGLAS') & (F.col('model').rlike('^(MD|DC)')==False), 'F')
                                )

In [31]:
planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_model), qa_model from planes_qa group by 2").show()

+---------------+--------+
|count(qa_model)|qa_model|
+---------------+--------+
|             15|       F|
|              0|    null|
+---------------+--------+



# Pergunta 6

In [32]:
planes_qa = planes_qa.withColumn('qa_engines', F.when(F.col('engines').isNull(), 'M')
                                                .when(~F.col('engines').between(1,4), 'I') 
                                                .when(F.col('engines').rlike('r[0-9a-zA-Z]+'), 'A')
                                )

planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_engines), qa_engines from planes_qa group by 2").show()


+-----------------+----------+
|count(qa_engines)|qa_engines|
+-----------------+----------+
|                0|      null|
+-----------------+----------+



# Pergunta 7

In [33]:
planes_qa = planes_qa.withColumn('qa_seats', F.when(F.col('seats').isNull(), 'M')
                                              .when(~F.col('seats').between(2, 500), 'I') 
                                              .when(F.col('seats').rlike('r[0-9a-zA-Z]+'), 'A')
                                )

planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_seats), qa_seats from planes_qa group by 2").show()

+---------------+--------+
|count(qa_seats)|qa_seats|
+---------------+--------+
|              0|    null|
+---------------+--------+



# Pergunta 8

In [34]:
planes_qa = planes_qa.withColumn('qa_speed', F.when(F.col('speed')=='NA', 'M')
                                              .when(~F.col('speed').between(50,150), 'I')
                                              .when(F.col('speed').rlike('r[0-9a-zA-Z]+'), 'A')
                                )


planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_speed), qa_speed from planes_qa group by 2").show()

+---------------+--------+
|count(qa_speed)|qa_speed|
+---------------+--------+
|              0|    null|
+---------------+--------+



# Pergunta 9

In [35]:
planes_qa = planes_qa.withColumn('qa_engine', F.when(F.col('engine').isNull(), 'M')
                                               .when(~F.col('engine').isin('Turbo-fan', 'Turbo-jet', 'Turbo-prop', 'Turbo-shaft', '4 Cycle'), 'C') 
                                )

planes_qa.createOrReplaceTempView('planes_qa')

spark.getOrCreate().sql("select count(qa_engine), qa_engine from planes_qa group by 2").show()

+----------------+---------+
|count(qa_engine)|qa_engine|
+----------------+---------+
|               0|     null|
|              10|        C|
+----------------+---------+



# FLIGHTS DATASET

# Pergunta 1

In [36]:
flights_qa = df_flights.withColumn('qa_year_month_day', F.when(F.col('year').isNull(), 'MY')
                                                         .when(F.col('month').isNull(), 'MM')
                                                         .when(F.col('day').isNull(), 'MD')
                                                         .when(F.col('year') < 1950, 'IY')
                                                         .when(~F.col('month').between(1,12), 'IM')
                                                         .when(((F.col('month') != 2) & (F.col('day') > 31)) | ((F.col('month') == 2) & (F.col('day') > 29)), 'ID')
                                  )


flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select count(qa_year_month_day), qa_year_month_day from flights_qa group by 2").show()

+------------------------+-----------------+
|count(qa_year_month_day)|qa_year_month_day|
+------------------------+-----------------+
|                       0|             null|
+------------------------+-----------------+



# Pergunta 2

In [37]:
flights_qa = flights_qa.withColumn('qa_hour_minute', F.when(F.col('hour')=='NA', 'MH')
                                                      .when(F.col('minute')=='NA', 'MM')
                                                      .when(~F.col('hour').between(0,24), 'IH')
                                                      .when(~F.col('minute').between(0,59), 'IM')                                  
                                  )


flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select count(qa_hour_minute), qa_hour_minute from flights_qa group by 2").show()

+---------------------+--------------+
|count(qa_hour_minute)|qa_hour_minute|
+---------------------+--------------+
|                    0|          null|
+---------------------+--------------+



# Pergunta 3

In [38]:
flights_qa = flights_qa.withColumn('qa_dep_arr', F.when(F.col('dep_time')=='NA', 'MD')
                                                  .when(F.col('arr_time')=='NA', 'MA')
                                                  .when(~F.col('dep_time').rlike('^([0-9]|1[0-9]|2[0-3])[0-5][0-9]$'), 'FD')
                                                  .when(~F.col('arr_time').rlike('^([0-9]|1[0-9]|2[0-3])[0-5][0-9]$'), 'FA')                                    
                                  )

flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select count(qa_dep_arr), qa_dep_arr from flights_qa group by 2").show()


+-----------------+----------+
|count(qa_dep_arr)|qa_dep_arr|
+-----------------+----------+
|                0|      null|
|               48|        MD|
|              151|        FA|
|                7|        MA|
|               90|        FD|
+-----------------+----------+



# Pergunta 4

In [39]:
flights_qa = flights_qa.withColumn('qa_dep_arr_delay', F.when(F.col('dep_delay').isNull(), 'MD')
                                                        .when(F.col('arr_delay').isNull(), 'MA')                                   
                                  )

flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select count(qa_dep_arr_delay), qa_dep_arr_delay from flights_qa group by 2").show(5)

+-----------------------+----------------+
|count(qa_dep_arr_delay)|qa_dep_arr_delay|
+-----------------------+----------------+
|                      0|            null|
|                     48|              MD|
|                     27|              MA|
+-----------------------+----------------+



# Pergunta 5

In [40]:
flights_qa = flights_qa.withColumn('qa_carrier', F.when(F.col('carrier').isNull(), 'M')
                                                  .when(F.col('carrier').rlike('^[0-9]*$'), 'F')
                                  )


flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select count(qa_carrier), qa_carrier, carrier from flights_qa group by 2, 3").show(5)

+-----------------+----------+-------+
|count(qa_carrier)|qa_carrier|carrier|
+-----------------+----------+-------+
|                0|      null|     UA|
|                0|      null|     AA|
|                0|      null|     B6|
|                0|      null|     DL|
|                0|      null|     OO|
+-----------------+----------+-------+
only showing top 5 rows



# Pergunta 6

In [41]:
flights_qa = flights_qa.withColumn('qa_tailnum', F.when(F.col('tailnum') == 'NA', 'M')
                                              .when(F.length(F.col('tailnum')) != 6, 'S')
                                              .when(~F.col('tailnum').rlike('^N([0-9]{1,4})([A-Z]{1,2}$)'), 'F')
                                              .when(~F.col('tailnum').rlike('^N.*'), 'FN')
                                              .when(F.col('tailnum').rlike('^[IO0]') == True, 'FE')
                                              )


flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select count(qa_tailnum), qa_tailnum, tailnum from flights_qa where qa_tailnum = 'F' group by 2, 3").show()

+-----------------+----------+-------+
|count(qa_tailnum)|qa_tailnum|tailnum|
+-----------------+----------+-------+
|                1|         F| N13248|
|                2|         F| N39475|
|                1|         F| N37255|
|                3|         F| N39418|
|                1|         F| N71411|
|                5|         F| N38417|
|                4|         F| N36447|
|                2|         F| N78438|
|                1|         F| N3GPAA|
|                4|         F| N16234|
|                2|         F| N3HUAA|
|                3|         F| N3DFAA|
|                1|         F| N54241|
|                1|         F| N77261|
|                1|         F| N3BLAA|
|                2|         F| N3FUAA|
|                1|         F| N3EKAA|
|                2|         F| N3GBAA|
|                3|         F| N26226|
|                1|         F| N3CTAA|
+-----------------+----------+-------+
only showing top 20 rows



# Pergunta 7

In [42]:
flights_qa = flights_qa.withColumn('qa_flight', F.when(F.col('flight').isNull(), 'M')
                                                 .when(F.col('flight').rlike('^[0-9]{4}$')==False, 'F') 
                                  )

flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select qa_flight, count(qa_flight) from flights_qa where qa_flight = 'F' group by 1").show()

+---------+----------------+
|qa_flight|count(qa_flight)|
+---------+----------------+
|        F|            6158|
+---------+----------------+



# Pergunta 8

In [43]:
flights_qa = flights_qa.withColumn('qa_origin_dest', F.when(F.col('origin').isNull(), 'MO')
                                                      .when(F.col('dest').isNull(), 'MD')
                                                      .when(F.col('origin').rlike('^[a-zA-Z0-9_]{3}$')==False, 'FO')
                                                      .when(F.col('dest').rlike('^[a-zA-Z0-9_]{3}$')==False, 'FD')
                                  )

flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select qa_origin_dest, count(qa_origin_dest) from flights_qa group by 1").show()

+--------------+---------------------+
|qa_origin_dest|count(qa_origin_dest)|
+--------------+---------------------+
|          null|                    0|
+--------------+---------------------+



# Pergunta 9

In [44]:
flights_qa = flights_qa.withColumn('qa_air_time', F.when(F.col('air_time').isNull(), 'M')
                                                  .when(~F.col('air_time').between(20,500), 'I') 
                                 )


flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select qa_air_time, count(qa_air_time) from flights_qa group by 1").show()

+-----------+------------------+
|qa_air_time|count(qa_air_time)|
+-----------+------------------+
|       null|                 0|
|          M|                75|
+-----------+------------------+



# Pergunta 10

In [45]:
flights_qa = flights_qa.withColumn('qa_distance', F.when(F.col('distance').isNull(), 'M')
                                                   .when(~F.col('distance').between(50, 3000), 'I')
                                  )

flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select qa_distance, count(qa_distance) from flights_qa group by 1").show()

+-----------+------------------+
|qa_distance|count(qa_distance)|
+-----------+------------------+
|       null|                 0|
+-----------+------------------+



# Pergunta 11

In [46]:
flights_qa = flights_qa.withColumn('qa_distance_airtime', F.when(F.col('air_time').isNull(), 'M')
                                                           .when(F.col('air_time') >= (F.col('distance')*0.1 + 30) , 'TL')
                                                           .when(F.col('air_time') <= (F.col('distance')*0.1 + 10) , 'TS')
                                                           .when(~(F.col('air_time') >= (F.col('distance') * 0.1 + 30)) & ~(F.col('air_time') <= (F.col('distance') * 0.1 + 10)), 'TR')
                                )


flights_qa.createOrReplaceTempView('flights_qa')

spark.getOrCreate().sql("select qa_distance_airtime, count(qa_distance_airtime), air_time, distance from flights_qa group by 1 , 3, 4").show()

+-------------------+--------------------------+--------+--------+
|qa_distance_airtime|count(qa_distance_airtime)|air_time|distance|
+-------------------+--------------------------+--------+--------+
|                 TL|                         7|     151|    1107|
|                 TL|                        12|     197|    1660|
|                 TL|                         1|     141|     867|
|                 TL|                         1|     262|    2182|
|                 TL|                        15|     180|    1448|
|                 TL|                         1|     354|    2677|
|                 TR|                         3|     121|     987|
|                 TL|                         2|     302|    2402|
|                 TL|                         4|     123|     909|
|                 TL|                         1|     288|    2306|
|                 TL|                         2|     129|     909|
|                 TR|                         1|     137|    1

# Gravando arquivos

airports_qa.write.parquet("C:/Users/tsaheki/Documents/Treinamento PySpark - Relatorios/airports_qa.parquet")


#SALVANDO ARQUIVO PARQUET
(df_airports.repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save("C:/Users/tsaheki/Documents/Treinamento PySpark - Relatorios/airports_qa.parquet"))
            

SALVANDO ARQUIVO CSV
(df_airports.select(df_airports.colRegex("`^qa_.*`"))
            .repartition(1) # coalesce
            .write.format("CSV")
            .mode('overwrite')
            .option("header", "true")
            .save('C:/Users/tsaheki/Documents/Treinamento PySpark - Relatorios/airports_qa.parquet'))
            
Salvando em parquet
(df_flights.repartition(1)
           .write.format('parquet')
           .mode('overwrite')
           .option('header', 'true')
           .save('C:/Users/tsaheki/Documents/Treinamento PySpark - Relatorios/airports_qa.parquet'))
           
airports.write.parquet(
    mode ='overwrite',
    path = 'C:/Practice_Pyspark/airports_semana_3/parquet')
    
airports_parquet = spark.read.parquet('C:/Practice_Pyspark/airports_semana_3/parquet')