In [3]:
#installing the required packages
!pip install pyspark
!pip install findspark



In [4]:
import pyspark
import findspark
findspark.init()

In [5]:
import re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [6]:
# Expressoes regulares comuns, pode-ser melhoradda
REGEX_ALPHA    = r'[:aplha:]+'
REGEX_INTEGER  = r'[:digit:]+'
REGEX_FLOAT    = r'[:digit:]+\.[:digit:]+'
REGEX_ALPHANUM = r'[:alnum:]+'
REGEX_EMPTY_STR= r'[:space:]+$'
REGEX_SPECIAL  = r'[:punct:]+'
REGEX_NNUMBER  = r'^N[1-9][0-9]{2,3}([ABCDEFGHJKLMNPRSTUVXWYZ]{1,2})'
REGEX_NNUMBER_INVALID = r'(N0.*$)|(.*[IO].*)'
#no inicio da linha pode ser [0-1]?[0-9])|(2[0-3]), no final ([0-5][0-9])$
REGEX_TIME_FMT = r'^(([0-1]?[0-9])|(2[0-3]))([0-5][0-9])$'

In [7]:
# Criar o contexto do spark
sc = SparkContext

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("semana 2")
                              )

In [8]:
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True)
])

schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
])

In [None]:
airport_df = (spark.getOrCreate().read
               .format('csv')
               .option("inferSchema", "false") 
               .option('header', "true")
               .schema(schema_airports)
               .load('./airports.csv'))

planes_df = (spark.getOrCreate().read
             .format('csv')
             .option('inferSchema', 'false')
             .option('header', 'true')
             .schema(schema_planes)
             .load('./planes.csv'))

flights_df = (spark.getOrCreate().read
              .format('csv')
              .option("inferSchema", "false")
              .option("header", "true")
              .load('./flights.csv'))

airport_df.createOrReplaceTempView('airports_view')
df_airport = airport_df

df_airport.show(5)

## Airport - Perguntas

# 1.

In [55]:
from pyspark.sql.functions import col, when, length
import re

airport_df = airport_df.withColumn("qa_faa", 
             when((col("faa") == "") |
             (col("faa").isNull()) , "M")\
            .when(
                 (length("faa").between(3 , 5)) &
                 (col('faa').rlike(REGEX_ALPHA)) | (col('faa').rlike('^([^0-9]*)$')), "F"
            )\
            .otherwise(col('faa'))
          
        )

# 2.

In [56]:
airport_df = airport_df.withColumn("qa_name", 
             when((col("name") == "")     |
                  (col("name").isNull())  |
                  (col('faa').rlike('\t') |
                  (col('faa').rlike(' +'))), "M")\
           .otherwise(col('name'))
        )

# 3. 

In [57]:
airport_df = airport_df.withColumn("qa_lat", 
             when((col("lat") == "")      |
                  (col("lat").isNull())   |
                  (col('lat').rlike('\t') |
                  (col('lat').rlike(' +'))), "M")\
             .when(
                 (col("lat") > '180.0') & 
                 (col("lat") < '-180.0'), "I"
            )\
            .when((col('lat').rlike('[a-zA-Z ]')), "A")
            .otherwise(col('lat'))
        )

# 4. 

In [58]:
airport_df = airport_df.withColumn("qa_lon", 
             when((col("lon") == "")      |
                  (col("lon").isNull())   |
                  (col('lon').rlike('\t') |
                  (col('lon').rlike(' +'))), "M")\
             .when(
                 (col("lon") > '180.0') & 
                 (col("lon") < '-180.0'), "I"
            )\
            .when((col('lon').rlike('[a-zA-Z ]')), "A")
            .otherwise(col('lon'))
        )

# 5.

In [59]:
airport_df = airport_df.withColumn("qa_alt", 
             when((col("alt") == "")      |
                  (col("alt").isNull())   |
                  (col('alt').rlike('\t') |
                  (col('alt').rlike(' +'))), "M")\
             .when((col("alt") < '0'), "I")\
            .when( (col('alt').rlike('[a-zA-Z ]')), "A")
            .otherwise(col('alt'))
        )

# 6.

In [60]:
airport_df = airport_df.withColumn("qa_tz", 
             when((col("tz") == "")      |
                  (col("tz").isNull())   |
                  (col('tz').rlike('\t') |
                  (col('tz').rlike(' +')) ), "M")\
             .when((col("tz") < '-11.0') &
                   (col("tz") > '14.0'), "I")\
            .when( (col('tz').rlike('[a-zA-Z ]')), "A")
            .otherwise(col('tz'))
        )

# 7. 

In [61]:
expected_categorys = ["E", "A", "S", "O", "Z", "N", "U"]

airport_df = airport_df.withColumn("qa_dst", 
            when((col("dst") == "")      |
                 (col("dst").isNull())   |
                 (col('dst').rlike('\t') |
                 (col('dst').rlike(' +')) ), "M")\
            .when( (col('dst').rlike('([0-9])') == True), "N")\
            .when((col("dst").isin(expected_categorys) == False) , "C")\
            .otherwise(col('dst'))
        )

# Planes Dataset

In [62]:
#lets describe our new file
planes_file = "./datasets/planes.csv"

#set our schema (that you can see on glossary)
schema = StructType([
    StructField("tailnum",      StringType()),
    StructField("year",         IntegerType()),
    StructField("type",         StringType()),
    StructField("manufacturer", StringType()),
    StructField("model",   StringType()),
    StructField("engines", IntegerType()),
    StructField("seats",   IntegerType()),
    StructField("speed",   IntegerType()),
    StructField("engine",  StringType())
])


#lets take the rdd to work with too
planes_rdd = planes_df.rdd

#don't forget to create a view (important to work with spark sql)
planes_df.createOrReplaceTempView('planes_view')

In [63]:
planes_rdd.take(10)

[Row(tailnum='N102UW', year=1998, type='Fixed wing multi engine', manufacturer='AIRBUS INDUSTRIE', model='A320-214', engines=2, seats=182, speed=None, engine='Turbo-fan'),
 Row(tailnum='N103US', year=1999, type='Fixed wing multi engine', manufacturer='AIRBUS INDUSTRIE', model='A320-214', engines=2, seats=182, speed=None, engine='Turbo-fan'),
 Row(tailnum='N104UW', year=1999, type='Fixed wing multi engine', manufacturer='AIRBUS INDUSTRIE', model='A320-214', engines=2, seats=182, speed=None, engine='Turbo-fan'),
 Row(tailnum='N105UW', year=1999, type='Fixed wing multi engine', manufacturer='AIRBUS INDUSTRIE', model='A320-214', engines=2, seats=182, speed=None, engine='Turbo-fan'),
 Row(tailnum='N107US', year=1999, type='Fixed wing multi engine', manufacturer='AIRBUS INDUSTRIE', model='A320-214', engines=2, seats=182, speed=None, engine='Turbo-fan'),
 Row(tailnum='N108UW', year=1999, type='Fixed wing multi engine', manufacturer='AIRBUS INDUSTRIE', model='A320-214', engines=2, seats=182, s

In [64]:
planes_df.show(10)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null

# 1. 

In [65]:
planes_df = planes_df.withColumn("qa_tailnum", 
                when((col("tailnum") == "")      |
                    (col("tailnum").isNull())   |
                    (col('tailnum').rlike('\t') |
                    (col('tailnum').rlike(' +'))), "M")\
                .when((length("tailnum") != 5), "S")\
                .when(
                    (col('tailnum').substr(1, 1) != 'N')   &
                    (col('tailnum').substr(-1, -1) != 'Z') & 
                    (col('tailnum').substr(2,4).rlike('^([^0-9]*)$')), "F")\
                .when((col("tailnum").substr(1,1) == "I")  |
                    (col("tailnum").substr(1,1) == "O") |
                    (col("tailnum").substr(1,1) == "0"), "FE")\
                .when((col("tailnum").substr(1,1) != "N"),  "FN")\
                .otherwise(col('tailnum'))
            )

planes_df.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  18

# 2. 

In [66]:
planes_df = planes_df.withColumn("qa_year", 
                when((col("year") == "")      |
                    (col("year").isNull())   |
                    (col('year').rlike('\t') |
                    (col('year').rlike(' +'))), "M")\
                .when((col("year") < 1950), "I")\
                .otherwise(col('year'))
            )
planes_df.select('qa_year').show()

+-------+
|qa_year|
+-------+
|   1998|
|   1999|
|   1999|
|   1999|
|   1999|
|   1999|
|   1999|
|   1999|
|   1999|
|   2000|
|   1999|
|   1999|
|   1999|
|   2000|
|   2000|
|   2000|
|   1998|
|   1998|
|   1995|
|   1987|
+-------+
only showing top 20 rows



# 3. 

In [67]:
type_categories = ['Fixed wing multi engine', 'fixed wing single engine', 'Rotorcraft']

planes_df = planes_df.withColumn("qa_type", 
                when((col("type") == "")      |
                    (col("type").isNull())   |
                    (col('type').rlike('\t') |
                    (col('type').rlike(' +'))), "M")\
                .when((col("type").isin(type_categories) == False),  "C")\
                .otherwise(col('type'))
            )

planes_df.select('qa_type').show()

+-------+
|qa_type|
+-------+
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
|      M|
+-------+
only showing top 20 rows



# 4. 

In [68]:
manufacture_categories = ["AIRBUS", "BOEING","BOMBARDIER","CESSNA","EMBRAER","SIKORSKY","CANADAIR",
                          "PIPER","MCDONNELL DOUGLAS","CIRRUS","BELL","KILDALL GARY","LAMBERT RICHARD",
                          "BARKER JACK","ROBINSON HELICOPTER","GULFSTREAM","MARZ BARRY"]

#using a udf
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType


# dica do eugênio -> df.map(lambda x: any([x.contains(f"%{y}%") for y in MANUFACTURERS]))

@udf
def qa_manufacturer(el):
    if(any([el.__contains__(f"{y}") for y in manufacture_categories])):
        return el
    else:
        return "M"

planes_df.select("manufacturer", qa_manufacturer('manufacturer')).show()

#lets go back to our df

planes_df = planes_df.withColumn("qa_manufacturer", 
                when((col("manufacturer") == "")      |
                    (col("manufacturer").isNull()), "M")\
                .when(
                    (col("manufacturer").contains("AIRBUS%")) |
                    (col("manufacturer").contains("BOEING%")) |
                    (col("manufacturer").contains("BOMBARDIER%")) |
                    (col("manufacturer").contains("CESSNA%"))   |
                    (col("manufacturer").contains("EMBRAER%"))  |
                    (col("manufacturer").contains("SIKORSKY%")) |
                    (col("manufacturer").contains("CANADAIR%")) |
                    (col("manufacturer").contains("PIPER%"))    |
                    (col("manufacturer").contains("MCDONNELL DOUGLAS%")) |
                    (col("manufacturer").contains("CIRRUS%")) |
                    (col("manufacturer").contains("BELL%"))   |
                    (col("manufacturer").contains("KILDALL GARY%"))    |
                    (col("manufacturer").contains("LAMBERT RICHARD%")) |
                    (col("manufacturer").contains("BARKER JACK%"))     |
                    (col("manufacturer").contains("ROBINSON HELICOPTER%")) |
                    (col("manufacturer").contains("GULFSTREAM%")) |
                    (col("manufacturer").contains("MARZ BARRY%")), "C"
                    )\
                .otherwise(col('manufacturer'))
            )
planes_df.select('qa_manufacturer').show()

+----------------+-----------------------------+
|    manufacturer|qa_manufacturer(manufacturer)|
+----------------+-----------------------------+
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|          BOEING|                       BOEING|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|AIRBUS INDUSTRIE|             AIRBUS INDUSTRIE|
|          BOEING|                       BOEING|
|          BOEING|  

# 5.

In [None]:
planes_df = planes_df.withColumn('qa_model',
                    when(col('model').isNull(), "M")
                   .when((col('manufacturer') == "AIRBUS") & (~col('model').rlike("^A")), "F")
                   .when((col('manufacturer') == "BOEING") & (~col('model').rlike("^7")), "F")
                   .when((col('manufacturer') == "BOMBARDIER") | (col('manufacturer') == "CANADAIR")
                         & (~col('model').rlike("^CL")), "F")
                   .when((col('manufacturer') == "MCDONNELL DOUGLAS") & (~col('model').rlike('^(MD|DC)')), "F"))

planes_df.groupBy("qa_model").count().distinct().orderBy("qa_model", ascending=True).show()

#  6. 

In [70]:
planes_df = planes_df.withColumn("qa_engines", 
                when((col("engines") == "") |
                    (col("engines").isNull()), "M")\
                .when((col("engines") < 1) |
                    (col("engines") > 4), "I")\
                .when((col('engines').rlike('^[0-9]*$') == False), "A")\
                .otherwise(col('engines'))
            )
planes_df.select('qa_engines').show()

+----------+
|qa_engines|
+----------+
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
+----------+
only showing top 20 rows



# 7. 

In [71]:
planes_df = planes_df.withColumn("qa_seats", 
                when((col("seats") == "")      |
                    (col("seats").isNull()), "M")\
                .when((col("seats") < 2) |
                    (col('seats') > 500), "S")\
                .when((col('seats').rlike('^([^0-9]*)$') == False), "F")\
                .otherwise(col('seats'))
            )
planes_df.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+----------------+--------+----------+--------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|qa_year|qa_type| qa_manufacturer|qa_model|qa_engines|qa_seats|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+----------------+--------+----------+--------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   1998|      M|AIRBUS INDUSTRIE|A320-214|         2|       F|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   1999|      M|AIRBUS INDUSTRIE|A320-214|         2|       F|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   1999|      M|AIRBUS INDUSTRIE|A320-214|         2|       F|
| N105UW|1

# 8. 

In [72]:
planes_df = planes_df.withColumn("qa_speed", 
                when((col("speed") == "")      |
                    (col("speed").isNull()), "M")\
                .when((col("speed") < 50.0) |
                    (col('speed') > 150.0), "S")\
                .when((col('speed').rlike('^([^0-9]*)$') == False), "F")\
                .otherwise(col('speed'))
            )
planes_df.select('qa_speed').show()

+--------+
|qa_speed|
+--------+
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
|       M|
+--------+
only showing top 20 rows



# 9.

In [73]:
engine_categories = ["Turbo-fan", "Turbo-jet","Turbo-prop","Turbo-prop","4 Cycle"]

planes_df = planes_df.withColumn("qa_engine", 
                when((col("engine") == "")      |
                    (col("engine").isNull()), "M")\
                .when(
                    (col("engine").contains("Turbo-fan%")) |
                    (col("engine").contains("Turbo-jet%")) |
                    (col("engine").contains("Turbo-prop%")) |
                    (col("engine").contains("Turbo-shaft%"))   |
                    (col("engine").contains("Cycle%")), "C"
                    )\
                .otherwise(col('engine'))
            )
planes_df.select('qa_engine').show()

+---------+
|qa_engine|
+---------+
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-fan|
|Turbo-jet|
|Turbo-fan|
+---------+
only showing top 20 rows



## Flights Dataset - Perguntas

In [74]:
# Flights Dataset

#vamos configura-lo

flights_file = "./datasets/flights.csv"

#set our schema (that you can see on glossary)
schema = StructType([
    StructField("year",  IntegerType()),
    StructField("month", IntegerType()),
    StructField("day",   IntegerType()),
    StructField("dep_time",  IntegerType()),
    StructField("dep_delay", IntegerType()),
    StructField("arr_time",  IntegerType()),
    StructField("arr_delay", IntegerType()),
    StructField("carrier",   StringType()),
    StructField("tailnum",   StringType()),
    StructField("flight",  IntegerType()),
    StructField("origin",  StringType()),
    StructField("destiny", StringType()),
    StructField("air_time", IntegerType()),
    StructField("distance", IntegerType()),
    StructField("hour",    IntegerType()),
    StructField("minute",  IntegerType())
])

#don't forget to create a view (important to work with spark sql)
flights_df.createOrReplaceTempView('flights_view')

flights_df.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

## 1. 

In [75]:
flights_df = flights_df.withColumn("qa_year_month_day",
                when((col('year').isNull()) | 
                    ((col('year') == '')), 'MY')\
                .when( (col('month').isNull()) | 
                    (col('month') == ''), 'MM')\
                .when( (col('day').isNull()) | 
                    (col('day') == ''), 'MD')\
                .when((col('year') < 1950 ), "IY")\
                .when((col('month') < 1 ) |
                    (col('month') > 12 ), "IM")\
                .when((
                    (col('month') == 2) &
                    ((col('day') < 1 ) |
                    (col('day') > 29 ))
                    ) |
                    ((col('month') != 2) &
                    ((col('day') < 1 ) |
                    (col('day') > 31 ))
                    ), "ID")\
            )
flights_df.select('qa_year_month_day').show()

+-----------------+
|qa_year_month_day|
+-----------------+
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
+-----------------+
only showing top 20 rows



## 2. 

In [76]:
flights_df = flights_df.withColumn("qa_hour_minute",
                when((col('hour').isNull()) | 
                    ((col('hour') == '')), 'MH')\
                .when( (col('minute').isNull()) | 
                    (col('minute') == ''), 'MM')\
                .when( (length('hour') == 1) & ( (col('hour').substr(1,1) < 0) | (col('hour').substr(1,1) > 24) ) |
                    (length('hour') == 2) & ((col('hour').substr(1,2) < 0) | (col('hour').substr(1,2) > 24) ), 'IH')\
                .when( ((length('minute') == 1) & ((col('hour').substr(2,3) < 0) | (col('hour').substr(2,3) > 9))) |
                    ((length('minute') == 2) & ((col('hour').substr(3,4) < 0) | (col('hour').substr(3,4) > 59) )), 'IM')\
            )
flights_df.select('qa_hour_minute').show()

+--------------+
|qa_hour_minute|
+--------------+
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
+--------------+
only showing top 20 rows



# 3. 

In [77]:
flights_df = flights_df.withColumn("qa_dep_arr_time",
                when((col("dep_time") == "") |
                    (col("dep_time").isNull()), "MD")\
                .when((col("arr_time") == "") |
                    (col("arr_time").isNull()), "MA")
                .when( (length('dep_time') == 3) & ( (col('dep_time').substr(1,1) < 0) | (col('dep_time').substr(1,1) > 24) ) |
                    (length('dep_time') == 4) & ((col('dep_time').substr(1,2) < 0) | (col('dep_time').substr(1,2) > 24) ), 'FD')\
                .when( ((length('arr_time') == 3) & ((col('arr_time').substr(2,3) < 0) | (col('arr_time').substr(2,3) > 59))) |
                    ((length('arr_time') == 4) & ((col('arr_time').substr(3,4) < 0) | (col('arr_time').substr(3,4) > 59) )), 'FA')\
            )
flights_df.select('qa_dep_arr_time').show()

+---------------+
|qa_dep_arr_time|
+---------------+
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
|           null|
+---------------+
only showing top 20 rows



# 4.

In [78]:
flights_df = flights_df.withColumn("qa_dep_arr_delay", 
                when((col("dep_delay") == "") |
                    (col("dep_delay").isNull()), "MD")\
                .when((col("arr_delay") == "") |
                    (col("arr_delay").isNull()), "MA")
            )
flights_df.select('qa_dep_arr_delay').show()

+----------------+
|qa_dep_arr_delay|
+----------------+
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
+----------------+
only showing top 20 rows



## 5. 

In [79]:
flights_df = flights_df.withColumn("qa_carrier", 
                when((col("carrier") == "") |
                    (col("carrier").isNull()), "M")\
                .when((length("carrier") != 2), "F")\
                .otherwise(col('carrier'))
            )
flights_df.select('qa_carrier').show()

+----------+
|qa_carrier|
+----------+
|        VX|
|        AS|
|        VX|
|        WN|
|        AS|
|        WN|
|        WN|
|        VX|
|        AS|
|        AS|
|        AS|
|        AS|
|        AS|
|        AS|
|        AS|
|        UA|
|        AS|
|        WN|
|        AS|
|        OO|
+----------+
only showing top 20 rows



## 6. 

In [80]:
planes_df = planes_df.withColumn("qa_tailnum", 
                when((col("tailnum") == "")      |
                    (col("tailnum").isNull())   |
                    (col('tailnum').rlike('\t') |
                    (col('tailnum').rlike(' +'))), "M")\
                .when((length("tailnum") != 5), "S")\
                .when(
                    (col('tailnum').substr(1, 1) != 'N')   &
                    (col('tailnum').substr(-1, -1) != 'Z') & 
                    (col('tailnum').substr(2,4).rlike('^([^0-9]*)$')), "F")\
                .when((col("tailnum").substr(1,1) == "I")  |
                    (col("tailnum").substr(1,1) == "O") |
                    (col("tailnum").substr(1,1) == "0"), "FE")\
                .when((col("tailnum").substr(1,1) != "N"),  "FN")\
                .otherwise(col('tailnum'))
            )
planes_df.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+----------------+--------+----------+--------+--------+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|qa_year|qa_type| qa_manufacturer|qa_model|qa_engines|qa_seats|qa_speed|qa_engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+----------------+--------+----------+--------+--------+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   1998|      M|AIRBUS INDUSTRIE|A320-214|         2|       F|       M|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|         S|   1999|      M|AIRBUS INDUSTRIE|A320-214|         2|       F|       M|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null

## 7.

In [81]:
flights_df =flights_df.withColumn("qa_flight", 
                when((col("flight") == "") |
                    (col("flight").isNull()), "M")\
                .when((col('flight').rlike('[0-9]{4}') == False), "F")\
                .otherwise(col('flight'))
            )
flights_df.select('qa_flight').show()

+---------+
|qa_flight|
+---------+
|     1780|
|        F|
|        F|
|        F|
|        F|
|        F|
|     1520|
|        F|
|        F|
|        F|
|        F|
|        F|
|        F|
|        F|
|        F|
|     1458|
|        F|
|        F|
|        F|
|     3488|
+---------+
only showing top 20 rows



## 8.

In [82]:
flights_df = flights_df.withColumn("qa_origin_dest", 
                when((col("origin") == "") |
                    (col("origin").isNull()), "MO")\
                .when((col("dest") == "") |
                    (col("dest").isNull()), "MD")\
                .when(((col('origin').rlike('([A-Z]|[a-z]|[0-9]{3})') == False) | (length('origin') != 3)), "FO")\
                .when(((col('dest').rlike('([A-Z]|[a-z]|[0-9]{3})') == False) | (length('dest') != 3)), "FD")\
            )
flights_df.select('qa_origin_dest').show()

+--------------+
|qa_origin_dest|
+--------------+
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
+--------------+
only showing top 20 rows



## 9. 

In [83]:
flights_df = flights_df.withColumn("qa_air_time", 
                when((col("air_time") == "") |
                    (col("air_time").isNull()), "M")\
                .when((col("air_time") < 20) |
                    (col("air_time") > 500), "I")\
                .otherwise(col('air_time'))
            )
flights_df.select('qa_air_time').show()

+-----------+
|qa_air_time|
+-----------+
|        132|
|        360|
|        111|
|         83|
|        127|
|        121|
|         90|
|         98|
|        135|
|        198|
|        130|
|        154|
|        127|
|        183|
|        129|
|         90|
|         76|
|        216|
|        290|
|        111|
+-----------+
only showing top 20 rows



## 10. 

In [84]:
flights_df =flights_df.withColumn("qa_distance", 
                when((col("distance") == "") |
                    (col("distance").isNull()), "M")\
                .when((col("distance") < 50) |
                    (col("distance") > 3000), "I")\
                .otherwise(col('distance'))
            )
flights_df.select('qa_distance').show()

+-----------+
|qa_distance|
+-----------+
|        954|
|       2677|
|        679|
|        569|
|        937|
|        991|
|        543|
|        679|
|       1050|
|       1721|
|        954|
|       1107|
|        867|
|       1448|
|        679|
|        550|
|        605|
|       1733|
|       2496|
|        817|
+-----------+
only showing top 20 rows



## 11. 

In [None]:
flights_df =flights_df.withColumn("qa_airtime", 
                when((col("air_time") == "") |
                    (col("air_time").isNull()), "M")\
                .when((col("air_time") >= (col('distance') * .1 ) + 30), "TL")\
                .when((col("air_time") <= (col('distance') * .1 ) + 10), "TS")\
                .when( ((col("air_time") >= (col('distance') * .1 ) + 30) &
                        ((col("air_time") <= (col('distance') * .1 ) + 10))) == False, "TR")\
            )
flights_df.select('qa_airtime').show()

In [None]:
#Após feitas as transformações, vamos salvar nosso arquivo, e teremos o parquet

planes_df.write.parquet("output/airplanes.parquet")
airport_df.write.parquet("output/airports.parquet")
flights_df.write.parquet("output/flights.parquet")