In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark



In [1]:
import findspark
findspark.init()

In [2]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, udf, lit, length, when, concat

In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Prática da semana 2 - Aceleração Pyspark") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.memory.offHeap.enabled","true") \
    .config("spark.memory.offHeap.size","10g")\
    .getOrCreate()

In [5]:
spark

## Airport - Perguntas


#### Pergunta 1

In [31]:
schema_air = (StructType()
            .add('faa', StringType(), True)
            .add('name', StringType(), True)
            .add('lat', DoubleType(), True)
            .add('lon', DoubleType(), True)
            .add('alt', IntegerType(), True)
            .add('tz', DoubleType(), True)
            .add('dst', StringType(), True)
        )

airports = spark.read.format('csv')\
    .option('header', True)\
    .schema(schema_air)\
    .load('../data/airports.csv', header = True)

airports.show(5, False)

+---+-----------------------------+----------+-----------+----+----+---+
|faa|name                         |lat       |lon        |alt |tz  |dst|
+---+-----------------------------+----------+-----------+----+----+---+
|04G|Lansdowne Airport            |41.1304722|-80.6195833|1044|-5.0|A  |
|06A|Moton Field Municipal Airport|32.4605722|-85.6800278|264 |-5.0|A  |
|06C|Schaumburg Regional          |41.9893408|-88.1012428|801 |-6.0|A  |
|06N|Randall Airport              |41.431912 |-74.3915611|523 |-5.0|A  |
|09J|Jekyll Island Airport        |31.0744722|-81.4277778|11  |-4.0|A  |
+---+-----------------------------+----------+-----------+----+----+---+
only showing top 5 rows



In [32]:
airports.show()

+---+--------------------+----------------+-----------------+----+----+---+
|faa|                name|             lat|              lon| alt|  tz|dst|
+---+--------------------+----------------+-----------------+----+----+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044|-5.0|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264|-5.0|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801|-6.0|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523|-5.0|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11|-4.0|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593|-4.0|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730|-5.0|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492|-5.0|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000|-5.0|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108|-8.0|  A|
|0W3|Harford

In [33]:
airports.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: double (nullable = true)
 |-- dst: string (nullable = true)



In [34]:
a1 = airports.withColumn('qa_faa', 
                         when((((~airports.faa.rlike('^\w{3,5}$'))) |
                               (airports.faa.rlike('^\d{1,}$')) | 
                               (airports.faa.rlike('^\D{1,}$'))), 'F')
                        .when((airports.faa == '') | 
                              (airports.faa == ' ') | 
                              (airports.faa.isNull()), 'M'))

In [35]:
a1.select(a1.faa, a1.qa_faa)\
    .filter(a1.qa_faa.isNotNull())\
    .show()

+---+------+
|faa|qa_faa|
+---+------+
|369|     F|
|AAF|     F|
|ABE|     F|
|ABI|     F|
|ABL|     F|
|ABQ|     F|
|ABR|     F|
|ABY|     F|
|ACK|     F|
|ACT|     F|
|ACV|     F|
|ACY|     F|
|ADK|     F|
|ADM|     F|
|ADQ|     F|
|ADS|     F|
|ADW|     F|
|AET|     F|
|AEX|     F|
|AFE|     F|
+---+------+
only showing top 20 rows



#### Pergunta 2

In [36]:
a2 = a1.withColumn('qa_name', 
                   when((a1.name == '') | 
                        (a1.name == ' ') | 
                        (a1.name.isNull()), 'M'))

In [37]:
a2.select(a2.name, a2.qa_name)\
    .filter(a2.qa_name.isNotNull())\
    .show()

+----+-------+
|name|qa_name|
+----+-------+
+----+-------+



#### Pergunta 3

In [38]:
a3 = a2.withColumn('qa_lat', 
                   when((a2.lat == '') | 
                        (a2.lat == ' ') | 
                        (a2.lat.isNull()), 'M')
                  .when(~a2.lat.between(-180, 180), 'I')
                  .when(a2.lat.rlike('(?=.*[A-z])'), 'A'))

In [39]:
a3.select(a3.lat, a3.qa_lat)\
    .filter(a3.qa_lat.isNotNull())\
    .show()

+---+------+
|lat|qa_lat|
+---+------+
+---+------+



#### Pergunta 4

In [40]:
a4 = a3.withColumn('qa_lon', 
                   when((a3.lon == '') | 
                        (a3.lon == ' ') | 
                        (a3.lon.isNull()), 'M')
                  .when(~a3.lon.between(-180, 180), 'I')
                  .when(a3.lon.rlike('(?=.*[A-z])'), 'A'))

In [41]:
a4.select(a4.lon, a4.qa_lon)\
    .filter(a4.qa_lon.isNotNull())\
    .show()

+---+------+
|lon|qa_lon|
+---+------+
+---+------+



#### Pergunta 5

In [42]:
a5 = a4.withColumn('qa_alt', 
                   when(a4.alt < 0, 'I')
                  .when((a4.alt == '') | 
                        (a4.alt == ' ') | 
                        (a4.alt.isNull()), 'M')
                  .when(a4.lon.rlike('(?=.*[A-z])'), 'A'))

In [43]:
a5.select(a5.alt, a5.qa_alt)\
    .filter(a5.qa_alt.isNotNull())\
    .show()

+---+------+
|alt|qa_alt|
+---+------+
|-54|     I|
|-42|     I|
+---+------+



### Pergunta 6

In [44]:
a6 = a5.withColumn('qa_tz', 
                   when((a5.tz == '') | 
                        (a5.tz == ' ') | 
                        (a5.tz.isNull()), 'M')
                  .when(~a5.tz.between(-11, 14), 'I')
                  .when(a5.tz.cast('int').isNull(), 'A'))

In [45]:
a6.select(a6.tz, a6.qa_tz)\
    .filter(a6.qa_tz.isNotNull())\
    .show()

+---+-----+
| tz|qa_tz|
+---+-----+
+---+-----+



### Pergunta 7

In [46]:
a7 = a6.withColumn('qa_dst', 
                   when((a6.dst == '') | 
                        (a6.dst == ' ') | 
                        (a6.dst.isNull()), 'M')
                  .when(~airports.dst.rlike('[EASOZNU]{1}'), 'C')
                  .when(airports.dst.rlike('[0-9]'), 'N'))

In [47]:
a7.select(a7.dst, a7.qa_dst)\
    .filter(a7.qa_dst.isNotNull())\
    .show()

+---+------+
|dst|qa_dst|
+---+------+
+---+------+



### Salvando em parquet

In [96]:
a7.write.parquet(
path = '../data/airports',
mode = 'overwrite'
)

### Carregando parquet

In [97]:
airports_parquet = spark.read.parquet('../data/airports')

In [50]:
airports_parquet.show(10)

+---+--------------------+----------+------------+----+----+---+------+-------+------+------+------+-----+------+
|faa|                name|       lat|         lon| alt|  tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|qa_tz|qa_dst|
+---+--------------------+----------+------------+----+----+---+------+-------+------+------+------+-----+------+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044|-5.0|  A|  null|   null|  null|  null|  null| null|  null|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264|-5.0|  A|  null|   null|  null|  null|  null| null|  null|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801|-6.0|  A|  null|   null|  null|  null|  null| null|  null|
|06N|     Randall Airport| 41.431912| -74.3915611| 523|-5.0|  A|  null|   null|  null|  null|  null| null|  null|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11|-4.0|  A|  null|   null|  null|  null|  null| null|  null|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593|-4.0|  A|  null|   null|  null|  

# Planes

In [72]:
schema_plan = (StructType()
    .add('tailnum', StringType(), True)
    .add('year', IntegerType(), True)
    .add('type', StringType(), True)
    .add('manufacturer', StringType(), True)
    .add('model', StringType(), True)
    .add('engines', IntegerType(), True)
    .add('seats', IntegerType(), True)
    .add('speed', IntegerType(), True)
    .add('engine', StringType(), True)
              )

planes = spark.read.format('csv')\
    .option('header', True)\
    .schema(schema_plan)\
    .load('../data/planes.csv')

planes.show(5)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 5 rows



In [73]:
planes.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: integer (nullable = true)
 |-- engine: string (nullable = true)



### Pergunta 1

In [74]:
p1 = planes.withColumn('qa_tailnum',( 
                         when((planes.tailnum == '') | 
                              (planes.tailnum == ' ') | 
                              (planes.tailnum.isNull()), 'M')
                        .when(length(planes.tailnum).between(5, 6), 'S')
                        .when(~planes.tailnum.rlike('^([A-Z])([0-9]{3,4})([A-Z]{1,2})$'), 'F')
                        .when(~planes.tailnum.startswith('N'), 'FN')
                        .when(planes.tailnum.rlike('^[OI]'), 'FE')
                                    )
                      )

In [75]:
p1.select(p1.tailnum, p1.qa_tailnum)\
    .filter(p1.tailnum.isNotNull())\
    .show()

+-------+----------+
|tailnum|qa_tailnum|
+-------+----------+
| N102UW|         S|
| N103US|         S|
| N104UW|         S|
| N105UW|         S|
| N107US|         S|
| N108UW|         S|
| N109UW|         S|
| N110UW|         S|
| N111US|         S|
| N11206|         S|
| N112US|         S|
| N113UW|         S|
| N114UW|         S|
| N117UW|         S|
| N118US|         S|
| N119US|         S|
| N1200K|         S|
| N1201P|         S|
| N12114|         S|
| N121DE|         S|
+-------+----------+
only showing top 20 rows



### Pergunta 2

In [76]:
p2 = p1.withColumn('qa_year', 
                   when((p1.year == '') | 
                        (p1.year == ' ') | 
                        (p1.year.isNull()), 'M')
                  .when(p1.year < 1950, 'I'))

In [77]:
(p2.select(p2.year, p2.qa_year)
   .filter(p2.qa_year.isNotNull())
   .show()
)

+----+-------+
|year|qa_year|
+----+-------+
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|   0|      I|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
|null|      M|
+----+-------+
only showing top 20 rows



### Pergunta 3

In [78]:
list_type = ['Fixed wing multi engine', 'Fixed wing single engine', 'Rotorcraft']

p3 = p2.withColumn('qa_type', 
                   when((p2.type == '')  | 
                        (p2.type == ' ') | 
                        (p2.type.isNull()), 'M')
                  .when(~p2.type.isin(list_type), 'C'))

In [79]:
p3.select(p3.type, p3.qa_type).filter(p3.qa_type.isNotNull()).show()

+----+-------+
|type|qa_type|
+----+-------+
+----+-------+



### Pergunta 4

In [80]:
list_manu = ['AIRBUS', 'BOEING', 'BOMBARDIER', 'CESSNA', 'EMBRAER', 'SIKORSKY', 'CANADAIR', 'PIPER', 'MCDONNELL DOUGLAS',
             'CIRRUS', 'BELL', 'KILDALL GARY', 'LAMBERT RICHARD', 'BARKER JACK', 'ROBINSON HELICOPTER', 'GULFSTREAM', 
             'MARZ BARRY']

p4 = p3.withColumn('qa_manufacturer', 
                   when((p3.manufacturer == '')  | 
                        (p3.manufacturer == ' ') | 
                        (p3.manufacturer.isNull()), 'M')
                  .when(~p3.manufacturer.isin(list_manu), 'C')
                  )

In [81]:
p4.select(p4.manufacturer, p4.qa_manufacturer).filter(p4.qa_manufacturer.isNotNull()).show()

+----------------+---------------+
|    manufacturer|qa_manufacturer|
+----------------+---------------+
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|AIRBUS INDUSTRIE|              C|
|  BOMBARDIER INC|              C|
+----------------+---------------+
only showing top 20 rows



### Pergunta 5

In [82]:
p5 = p4.withColumn('qa_model', 
                   when(((p4.manufacturer == 'AIRBUS') & 
                         ~(p4.model.startswith('A'))) | 
                        ((p4.manufacturer == 'BOEING') & 
                         ~(p4.model.startswith('7'))) | 
                        (((p4.manufacturer == 'BOMBARDIER') | 
                          (p4.manufacturer == 'CANADAIR')) & 
                         ~(p4.model.startswith('CL'))) | 
                        ((p4.manufacturer == 'MCDONNELL DOUGLAS') & 
                         ~((p4.model.startswith('MD')) | 
                           (p4.model.startswith('DC')))), 'F')
                  .when((p4.manufacturer == '')  |
                        (p4.manufacturer == ' ') |
                        (p4.manufacturer.isNull()), 'M') 
                  )

In [83]:
p5.select(p5.manufacturer, p5.model, p5.qa_model)\
    .filter(p5.qa_model.isNotNull())\
    .show()

+------------+--------+--------+
|manufacturer|   model|qa_model|
+------------+--------+--------+
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
|      BOEING|MD-90-30|       F|
+------------+--------+--------+



### Pergunta 6

In [84]:
p6 = p5.withColumn('qa_engines', 
                   when((p5.engines == '') | 
                        (p5.engines == ' ') | 
                        (p5.engines.isNull()), 'M')
                  .when(~col('engines').between(1, 4), 'I')
                  .when(col('engines').rlike('.*[A-Z]'), 'A'))

In [85]:
p6.select(p6.engines, p6.qa_engines)\
    .filter(col('qa_engines').isNotNull())\
    .show()

+-------+----------+
|engines|qa_engines|
+-------+----------+
+-------+----------+



### Pergunta 7

In [86]:
p7 = p6.withColumn('qa_seats', 
                   when((p6.seats == '') | 
                        (p6.seats == ' ') | 
                        (p6.seats.isNull()), 'M')
                  .when(~p6.seats.between(2,500), 'I')
                  .when(p6.seats.cast('int').isNull(), 'A'))

In [87]:
p7.select(p7.seats, p7.qa_seats)\
    .filter(p7.qa_seats.isNotNull())\
    .show()

+-----+--------+
|seats|qa_seats|
+-----+--------+
+-----+--------+



### Pergunta 8

In [88]:
p8 = p7.withColumn('qa_speed', 
                   when((p7.speed == '')  | 
                        (p7.speed == ' ') | 
                        (p7.speed.isNull()), 'M')
                  .when(~p7.speed.between(50, 150), 'I')
                  .when(p7.speed.cast('int').isNull(), 'A'))

In [89]:
p8.select(p8.speed, p8.qa_speed)\
    .filter(p8.qa_speed.isNotNull())\
    .show()

+-----+--------+
|speed|qa_speed|
+-----+--------+
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
| null|       M|
+-----+--------+
only showing top 20 rows



### Pergunta 9

In [90]:
list_engine = ['Turbo-fan', 'Turbo-jet', 'Turbo-prop', 'Turbo-shaft', '4 Cycle']
list_engine

['Turbo-fan', 'Turbo-jet', 'Turbo-prop', 'Turbo-shaft', '4 Cycle']

In [91]:
p9 = p8.withColumn('qa_engine', 
                   when((p8.engine == '') | 
                        (p8.engine == ' ') | 
                        (p8.engine.isNull()), 'M')
                  .when(~p8.engine.isin(list_engine), 'C'))

In [92]:
p9.select(p9.engine, p9.qa_engine)\
    .filter(p9.qa_engine.isNotNull())\
    .show()

+-------------+---------+
|       engine|qa_engine|
+-------------+---------+
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
|Reciprocating|        C|
+-------------+---------+



### Salvando em parquet

In [98]:
p9.write.format('parquet')\
    .mode('overwrite')\
    .save('../data/planes')

### Abrindo parquet

In [99]:
planes_parquet = spark.read.parquet('../data/planes')
planes_parquet.toPandas()

Unnamed: 0,tailnum,year,type,manufacturer,model,engines,seats,speed,engine,qa_tailnum,qa_year,qa_type,qa_manufacturer,qa_model,qa_engines,qa_seats,qa_speed,qa_engine
0,N102UW,1998.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan,S,,,C,,,,M,
1,N103US,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan,S,,,C,,,,M,
2,N104UW,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan,S,,,C,,,,M,
3,N105UW,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan,S,,,C,,,,M,
4,N107US,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan,S,,,C,,,,M,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2623,N983SW,2004.0,Fixed wing multi engine,BOMBARDIER INC,CL-600-2B19,2,55,,Turbo-fan,S,,,C,,,,M,
2624,N984CA,1997.0,Fixed wing multi engine,CANADAIR,CL-600-2B19,2,55,,Turbo-fan,S,,,,,,,M,
2625,N986CA,,Fixed wing multi engine,CANADAIR,CL-600-2B19,2,55,,Turbo-fan,S,M,,,,,,M,
2626,N986SW,2004.0,Fixed wing multi engine,BOMBARDIER INC,CL-600-2B19,2,55,,Turbo-fan,S,,,C,,,,M,


# Flights

In [100]:
schema_flights = (StructType()
    .add('year', IntegerType(), True)
    .add('month', IntegerType(), True)
    .add('day', IntegerType(), True)
    .add('dep_time', StringType(), True)
    .add('dep_delay', IntegerType(), True)
    .add('arr_time', StringType(), True)
    .add('arr_delay', IntegerType(), True)
    .add('carrier', StringType(), True)
    .add('tailnum', StringType(), True)
    .add('flight', StringType(), True)
    .add('origin', StringType(), True)
    .add('dest', StringType(), True)
    .add('air_time', IntegerType(), True)
    .add('distance', IntegerType(), True)
    .add('hour', IntegerType(), True)\
    .add('minute', IntegerType(), True)
                 )

In [101]:
flights = spark.read.format('csv')\
    .option('header', True)\
    .schema(schema_flights)\
    .load('../data/flights.csv')

In [102]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [103]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

### Pergunta 1

In [104]:
f1 = flights.withColumn('qa_year_month_day', 
                        when((flights.year == '') | 
                             (flights.year == ' ') | 
                             (flights.year.isNull()), 'MY')
                       .when((flights.month == '') | 
                             (flights.month == ' ') | 
                             (flights.month.isNull()), 'MM')
                       .when((flights.day == '') | 
                             (flights.day == ' ') | 
                             (flights.day.isNull()), 'MD')
                       .when(flights.year < 1950, 'IY')
                       .when(~flights.month.between(1, 12), 'IM')
                       .when(((flights.month == 2) & 
                              (~flights.day.between(1, 29))) | 
                             (((flights.month != 2)) & 
                              (~flights.day.between(1, 31))), 'ID'))

In [105]:
f1.select(f1.year, f1.month, f1.day, f1.qa_year_month_day)\
    .filter(f1.qa_year_month_day.isNotNull())\
    .show()

+----+-----+---+-----------------+
|year|month|day|qa_year_month_day|
+----+-----+---+-----------------+
+----+-----+---+-----------------+



### Pergunta 2

In [107]:
f2 = f1.withColumn('qa_hour_minute',
                  when((f1.hour == "") | 
                       (f1.hour == ' ') | 
                       (f1.hour.isNull()), 'MH')
                  .when((f1.minute == '') | 
                        (f1.minute == ' ') | 
                        (f1.minute.isNull()), 'MM')
                  .when(~f1.hour.between(0, 24), 'IH')
                  .when(~f1.minute.between(0,59), 'IM'))

In [108]:
f2.select(f2.hour, f2.minute, f2.qa_hour_minute)\
    .filter(f2.qa_hour_minute.isNotNull())\
    .show()

+----+------+--------------+
|hour|minute|qa_hour_minute|
+----+------+--------------+
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
|null|  null|            MH|
+----+------+--------------+
only showing top 20 rows



### Pergunta 3

In [111]:
f3 = f2.withColumn('qa_dep_arr_time',
                  when((f2.dep_time == '') | 
                      (f2.dep_time == ' ') |
                      (f2.dep_time.isNull()), 'MD')
                  .when((f2.arr_time == '') | 
                        (f2.arr_time == ' ') | 
                        (f2.arr_time.isNull()), 'MA')
                  .when(~(f2.dep_time.rlike('^([0-9]|1[0-9]|2[0-4])[0-5][0-9]$')), 'FD') 
                  .when(~(f2.arr_time.rlike('^([0-9]|1[0-9]|2[0-4])[0-5][0-9]$')), 'FA'))


In [112]:
f3.select(f3.dep_time, f3.arr_time, f3.qa_dep_arr_time)\
    .filter(f3.qa_dep_arr_time.isNotNull())\
    .show()

+--------+--------+---------------+
|dep_time|arr_time|qa_dep_arr_time|
+--------+--------+---------------+
|    2222|      55|             FA|
|    2224|      48|             FA|
|    2234|      11|             FA|
|    2222|      55|             FA|
|       1|     600|             FD|
|    2233|      45|             FA|
|      26|     518|             FD|
|      46|     552|             FD|
|    2339|      29|             FA|
|      NA|      NA|             FD|
|      NA|      NA|             FD|
|      NA|      NA|             FD|
|    2219|      49|             FA|
|      29|     538|             FD|
|      49|     835|             FD|
|    1603|       2|             FA|
|      NA|      NA|             FD|
|      24|     300|             FD|
|    2231|      42|             FA|
|      15|     800|             FD|
+--------+--------+---------------+
only showing top 20 rows



### Pergunta 4

In [113]:
f4 = f3.withColumn('qa_dep_arr_delay',
                  when((f3.dep_delay == '') | 
                       (f3.dep_delay == ' ') | 
                       (f3.dep_delay.isNull()), 'MD')
                  .when((f3.arr_delay == '') | 
                        (f3.arr_delay == ' ') | 
                        (f3.arr_delay.isNull()), 'MA'))

In [114]:
f4.select(f4.dep_delay, f4.arr_delay, f4.qa_dep_arr_delay)\
    .filter(f4.qa_dep_arr_delay.isNotNull())\
    .show()

+---------+---------+----------------+
|dep_delay|arr_delay|qa_dep_arr_delay|
+---------+---------+----------------+
|        4|     null|              MA|
|     null|     null|              MD|
|     null|     null|              MD|
|     null|     null|              MD|
|       40|     null|              MA|
|     null|     null|              MD|
|     null|     null|              MD|
|       -9|     null|              MA|
|     null|     null|              MD|
|     null|     null|              MD|
|        8|     null|              MA|
|     null|     null|              MD|
|     null|     null|              MD|
|     null|     null|              MD|
|       -8|     null|              MA|
|     null|     null|              MD|
|     null|     null|              MD|
|     null|     null|              MD|
|     null|     null|              MD|
|       55|     null|              MA|
+---------+---------+----------------+
only showing top 20 rows



### Pergunta 5

In [115]:
f5 = f4.withColumn('qa_carrier',
                  when((f4.carrier == '') | 
                       (f4.carrier == ' ') | 
                       (f4.carrier.isNull()), 'M')
                  .when((f4.carrier.rlike('[A-Z]{2}')) |
                        (f4.carrier.rlike('[0-9]{2}')) |
                        (~f4.carrier.rlike('[A-Z0-9]{2}')), 'F'))

In [116]:
f5.select(f5.carrier, f5.qa_carrier)\
    .filter(f5.qa_carrier.isNotNull())\
    .show()

+-------+----------+
|carrier|qa_carrier|
+-------+----------+
|     VX|         F|
|     AS|         F|
|     VX|         F|
|     WN|         F|
|     AS|         F|
|     WN|         F|
|     WN|         F|
|     VX|         F|
|     AS|         F|
|     AS|         F|
|     AS|         F|
|     AS|         F|
|     AS|         F|
|     AS|         F|
|     AS|         F|
|     UA|         F|
|     AS|         F|
|     WN|         F|
|     AS|         F|
|     OO|         F|
+-------+----------+
only showing top 20 rows



### Pergunta 6 

In [117]:
f6 = f5.withColumn('qa_tailnum',
                  when((f5.tailnum == '') | 
                       (f5.tailnum == ' ') | 
                       (f5.tailnum.isNull()), 'M')
                  .when(~length(f5.tailnum).between(5,6), 'S')
                  .when(~col('tailnum')
                        .rlike('^[A-Z]([0-9]{3})([A-Z]{1,2})$'), 'F')
                  .when(~col('tailnum').startswith('N'), 'FN')
                  .when(col('tailnum').startswith('[I, O, 0]'), 'FE'))

In [118]:
f6.select(f6.tailnum, f6.qa_tailnum)\
    .filter(f6.qa_tailnum.isNotNull())\
    .show()

+-------+----------+
|tailnum|qa_tailnum|
+-------+----------+
| N27205|         F|
| N8634A|         F|
| N68805|         F|
| N37468|         F|
| N37419|         F|
| N4YJAA|         F|
| N39450|         F|
| N34282|         F|
| N3760C|         F|
|  N6701|         F|
| N16234|         F|
| N3769L|         F|
| N3764D|         F|
| N36207|         F|
| N3EKAA|         F|
| N8648A|         F|
| N8629A|         F|
| N7744A|         F|
| N3DRAA|         F|
| N13716|         F|
+-------+----------+
only showing top 20 rows



### Pergunta 7

In [119]:
f7 = f6.withColumn('qa_flight',
                  when((f6.flight == '') | 
                       (f6.flight == ' ') | 
                       (f6.flight.isNull()), 'M')
                  .when(~f6.flight.rlike('[0-9]{4}'), 'F'))

In [120]:
f7.select(f7.flight, f7.qa_flight)\
    .filter(f7.qa_flight.isNotNull())\
    .show()

+------+---------+
|flight|qa_flight|
+------+---------+
|   851|        F|
|   755|        F|
|   344|        F|
|   522|        F|
|    48|        F|
|   755|        F|
|   490|        F|
|    26|        F|
|   448|        F|
|   656|        F|
|   608|        F|
|   121|        F|
|   306|        F|
|   368|        F|
|   827|        F|
|    24|        F|
|   300|        F|
|   616|        F|
|   306|        F|
|    29|        F|
+------+---------+
only showing top 20 rows



### Pergunta 8

In [121]:
f8 = f7.withColumn('qa_origin_dest', 
                  when((f7.origin == '') | 
                       (f7.origin == ' ') | 
                       (f7.origin.isNull()), 'MO')
                  .when((f7.dest == '') | 
                        (f7.dest == ' ') | 
                        (f7.dest.isNull()), 'MD')
                  .when((~f7.origin.rlike('[A-Z0-9]{3}')) |
                        (f7.origin.rlike('[A-Z]{3}')) | 
                        (f7.origin.rlike('[0-9]{3}')), 'FO')
                   .when((~f7.dest.rlike('[A-Z0-9]{3}')) |
                        (f7.dest.rlike('[A-Z]{3}')) | 
                        (f7.dest.rlike('[0-9]{3}')), 'FD'))

In [122]:
f8.select(f8.origin, f8.dest, f8.qa_origin_dest)\
    .filter(f8.qa_origin_dest.isNotNull())\
    .show()

+------+----+--------------+
|origin|dest|qa_origin_dest|
+------+----+--------------+
|   SEA| LAX|            FO|
|   SEA| HNL|            FO|
|   SEA| SFO|            FO|
|   PDX| SJC|            FO|
|   SEA| BUR|            FO|
|   PDX| DEN|            FO|
|   PDX| OAK|            FO|
|   SEA| SFO|            FO|
|   SEA| SAN|            FO|
|   SEA| ORD|            FO|
|   SEA| LAX|            FO|
|   SEA| PHX|            FO|
|   SEA| LAS|            FO|
|   SEA| ANC|            FO|
|   SEA| SFO|            FO|
|   PDX| SFO|            FO|
|   SEA| SMF|            FO|
|   SEA| MDW|            FO|
|   SEA| BOS|            FO|
|   PDX| BUR|            FO|
+------+----+--------------+
only showing top 20 rows



### Pergunta 9

In [123]:
f9 = f8.withColumn('qa_air_time', 
                  when((f8.air_time == '') | 
                       (f8.air_time == " ") | 
                       (f8.air_time.isNull()), 'M')
                  .when(~f8.air_time.between(20, 500), 'I'))

In [124]:
f9.select(f9.air_time, f9.qa_air_time)\
    .filter(f9.qa_air_time.isNotNull())\
    .show()

+--------+-----------+
|air_time|qa_air_time|
+--------+-----------+
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
|    null|          M|
+--------+-----------+
only showing top 20 rows



### Pergunta 10

In [125]:
f10 = f9.withColumn('qa_distance',
                   when((f9.distance == '') | 
                        (f9.distance == ' ') | 
                        (f9.distance.isNull()), 'M')
                   .when(~f9.distance.between(50, 3000), 'I'))

In [126]:
f10.select(f10.distance, f10.qa_distance)\
    .filter(f10.qa_distance.isNotNull())\
    .show()

+--------+-----------+
|distance|qa_distance|
+--------+-----------+
+--------+-----------+



### Pergunta 11

In [127]:
f11 = f10.withColumn('qa_distance_airtime',
                    when((f10.distance == '') | 
                         (f10.distance == ' ') | 
                         (f10.distance.isNull()) |
                         (f10.air_time == '') | 
                         (f10.air_time == ' ') | 
                         (f10.air_time.isNull()), 'M')
                    .when(f10.air_time >= (f10.distance * 0.1 + 30), 'TL')
                    .when(f10.air_time <= (f10.distance * 0.1 + 10), 'TS')
                    .otherwise('TR'))

In [128]:
f11.select(f11.distance, f11.air_time, f11.qa_distance_airtime)\
    .show()

+--------+--------+-------------------+
|distance|air_time|qa_distance_airtime|
+--------+--------+-------------------+
|     954|     132|                 TL|
|    2677|     360|                 TL|
|     679|     111|                 TL|
|     569|      83|                 TR|
|     937|     127|                 TL|
|     991|     121|                 TR|
|     543|      90|                 TL|
|     679|      98|                 TL|
|    1050|     135|                 TL|
|    1721|     198|                 TR|
|     954|     130|                 TL|
|    1107|     154|                 TL|
|     867|     127|                 TL|
|    1448|     183|                 TL|
|     679|     129|                 TL|
|     550|      90|                 TL|
|     605|      76|                 TR|
|    1733|     216|                 TL|
|    2496|     290|                 TL|
|     817|     111|                 TR|
+--------+--------+-------------------+
only showing top 20 rows



# Salvando em parquet

In [129]:
f11.write.parquet(
path = '../data/flights',
mode = 'overwrite'
)

# Abrindo parquet

In [130]:
flights_parquet = spark.read.parquet('../data/flights')

In [131]:
flights_parquet.toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,...,qa_hour_minute,qa_dep_arr_time,qa_dep_arr_delay,qa_carrier,qa_tailnum,qa_flight,qa_origin_dest,qa_air_time,qa_distance,qa_distance_airtime
0,2014,12,8,658,-7.0,935,-5.0,VX,N846VA,1780,...,,,,F,,,FO,,,TL
1,2014,1,22,1040,5.0,1505,5.0,AS,N559AS,851,...,,,,F,,F,FO,,,TL
2,2014,3,9,1443,-2.0,1652,2.0,VX,N847VA,755,...,,,,F,,F,FO,,,TL
3,2014,4,9,1705,45.0,1839,34.0,WN,N360SW,344,...,,,,F,,F,FO,,,TR
4,2014,3,9,754,-1.0,1015,1.0,AS,N612AS,522,...,,,,F,,F,FO,,,TL
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,2014,6,23,1806,-4.0,2104,-6.0,OO,N225AG,3458,...,,,,F,,,FO,,,TR
9996,2014,8,31,2336,11.0,452,-13.0,AA,N3LEAA,1230,...,,,,F,F,,FO,,,TR
9997,2014,8,8,904,-1.0,1042,-5.0,AS,N523AS,360,...,,,,F,,F,FO,,,TR
9998,2014,8,29,1441,26.0,1820,10.0,WN,N8647A,2857,...,,,,F,F,,FO,,,TR
