# Qualidade

In [1]:
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()

In [3]:
# Expressoes regulares comuns
REGEX_ALPHA    = r'[a-zA-Z]+'
REGEX_INTEGER  = r'[0-9]+'
REGEX_FLOAT    = r'[0-9]+\.[0-9]+'
REGEX_ALPHANUM = r'[0-9a-zA-Z]+'
REGEX_EMPTY_STR= r'[\t ]+$'
REGEX_SPECIAL  = r'[!@#$%&*\(\)_]+'
REGEX_NNUMBER  = r'^N[1-9][0-9]{2,3}([ABCDEFGHJKLMNPRSTUVXWYZ]{1,2})'
REGEX_NNUMBER_INVALID = r'(N0.*$)|(.*[IO].*)'
REGEX_TIME_FMT = r'^(([0-1]?[0-9])|(2[0-3]))([0-5][0-9])$'

In [4]:
# Funcoes auxiiliares
def split_csv(line):
    return tuple(map(lambda x: x.replace('"',''), line.split(",")))

def check_empty_column(col):
    return (F.col(col).isNull() | (F.col(col) == '') | F.col(col).rlike(REGEX_EMPTY_STR))

In [5]:
import re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [6]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("Aceleração PySpark - Capgemini"))

In [7]:
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])


schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True)
])


schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
])



In [8]:
df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load("../data/airports.csv"))

df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load("../data/planes.csv"))

df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load("../data/flights.csv"))

# Airports Dataset

### Pergunta 1



In [9]:
df_airports = df_airports.withColumn('qa_faa', (
        F.when(check_empty_column('faa'), 'M')
         .when(
            (~F.length(F.col('faa')).between(3, 5) |
            (~F.col('faa').rlike(REGEX_ALPHANUM))), 'F')))

df_airports.show(10)

df_airports.groupBy("qa_faa").count().distinct().show()
                            


+---+--------------------+---------+-----------+----+---+---+------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|
+---+--------------------+---------+-----------+----+---+---+------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|  null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|  null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|  null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|  null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|  null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|  null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|  null|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|  null|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  U|  null|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|  null|
+---+--------------------+---------+-----------+----+---+---+------+
only showing top 10 rows

+------+

### Pergunta 2



In [10]:
df_airports = df_airports.withColumn('qa_name', F.when(F.col('name').isNull(), 'M'))
df_airports.show(5)

df_airports.groupBy("qa_name").count().distinct().show()
df_airports.filter(F.col('qa_name') == "M").show(5,False)




+---+--------------------+---------+---------+----+---+---+------+-------+
|faa|                name|      lat|      lon| alt| tz|dst|qa_faa|qa_name|
+---+--------------------+---------+---------+----+---+---+------+-------+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|  null|   null|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|  null|   null|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|  null|   null|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|  null|   null|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -4|  A|  null|   null|
+---+--------------------+---------+---------+----+---+---+------+-------+
only showing top 5 rows

+-------+-----+
|qa_name|count|
+-------+-----+
|   null| 1397|
+-------+-----+

+---+----+---+---+---+---+---+------+-------+
|faa|name|lat|lon|alt|tz |dst|qa_faa|qa_name|
+---+----+---+---+---+---+---+------+-------+
+---+----+---+---+---+---+---+------+-------+



### Pergunta 3



In [11]:
df_airports = df_airports.withColumn("qa_lat",
                   F.when(F.col('lat').isNull(), "M")
                   .when(~F.col('lat').between(-180,180), "I")
                   .when(F.col('lat').rlike('^[a-zA-Z0-9_]*$'), "A"))
df_airports.show()

df_airports.groupBy("qa_lat").count().distinct().show()

+---+--------------------+---------+-----------+----+---+---+------+-------+------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|  null|   null|  null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|  null|   null|  null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|  null|   null|  null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|  null|   null|  null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|  null|   null|  null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|  null|   null|  null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|  null|   null|  null|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|  null|   null|  null|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  U|  null|   null|

### Pergunta 4



In [12]:
df_airports = df_airports.withColumn("qa_lon",
                   F.when(F.col('lon').isNull(), "M")
                   .when(~F.col('lon').between(-180,180), "I")
                   .when(F.col('lon').rlike('^[a-zA-Z0-9_]*$'), "A"))
df_airports.show()

df_airports.groupBy("qa_lon").count().distinct().show()

+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|  null|   null|  null|  null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|  null|   null|  null|  null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|  null|   null|  null|  null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|  null|   null|  null|  null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|  null|   null|  null|  null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|  null|   null|  null|  null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|  null|   null|  null|  null|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|  null|   null|  null|  null|

### Pergunta 5



In [13]:
df_airports = df_airports.withColumn('qa_alt', (
            F.when(check_empty_column('alt'), 'M')
             .when(~F.col('alt').rlike(REGEX_INTEGER), 'A')             
             .when(F.col('alt') < 0, 'I')))

df_airports.groupBy("qa_alt").count().distinct().show()
df_airports.filter(F.col('alt') < 0).show()

+------+-----+
|qa_alt|count|
+------+-----+
|  null| 1395|
|     I|    2|
+------+-----+

+---+-------------+---------+----------+---+---+---+------+-------+------+------+------+
|faa|         name|      lat|       lon|alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|
+---+-------------+---------+----------+---+---+---+------+-------+------+------+------+
|IPL|  Imperial Co| 32.83422|-115.57874|-54| -8|  A|  null|   null|  null|  null|     I|
|NJK|El Centro Naf|32.829224|-115.67167|-42| -8|  A|  null|   null|  null|  null|     I|
+---+-------------+---------+----------+---+---+---+------+-------+------+------+------+



### Pergunta 6



In [14]:
df_airports = df_airports.withColumn("qa_tz",
                    F.when(F.col('tz').isNull(), "M")
                    .when(~F.col('tz').between(-11,14), "I")
                    .when(~F.col('tz').rlike(REGEX_INTEGER), "A"))
df_airports.show()

df_airports.groupBy("qa_tz").count().distinct().show()

+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+------+-----+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|qa_tz|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+------+-----+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|  null|   null|  null|  null|  null| null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|  null|   null|  null|  null|  null| null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|  null|   null|  null|  null|  null| null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|  null|   null|  null|  null|  null| null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|  null|   null|  null|  null|  null| null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|  null|   null|  null|  null|  null| null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|  n

### Pergunta 7



In [15]:
categorias = ["E", "A", "S", "O", "Z", "N","U"]

df_airports = df_airports.withColumn("qa_dst",
                  F.when(F.col('dst').isNull(), "M")
                  .when(~F.col('dst').isin(categorias), "C")
                  .when(F.col('dst').rlike("^[0-9]*$"), "N"))

df_airports.show()
df_airports.printSchema

df_airports.groupBy("qa_dst").count().distinct().show()

+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+------+-----+------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_alt|qa_tz|qa_dst|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+------+-----+------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|  null|   null|  null|  null|  null| null|  null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|  null|   null|  null|  null|  null| null|  null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|  null|   null|  null|  null|  null| null|  null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|  null|   null|  null|  null|  null| null|  null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|  null|   null|  null|  null|  null| null|  null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|  null|   null|  null|  null|  null| null|  null|
|

### Criando Airports parquet

In [16]:
(df_airports.select("faa","qa_faa","qa_name","qa_lat","qa_lon","qa_alt","qa_tz","qa_dst")
            .repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save("airports_qa.parquet"))



# Planes Dataset

### Pergunta 1

In [17]:
df_planes = (df_planes.withColumn('qa_tailnum',
F.when((F.col('tailnum').isNull()), "M")
.when((F.length(F.col('tailnum')) != 5) & (F.length(F.col('tailnum')) != 6), "S")
.when((F.col('tailnum').rlike("^N([0-9]{1,4})([A-Z]{1,2}$)") == False), "F")
.when((F.col('tailnum').rlike("^N") == False), "FN")
.when((F.col('tailnum').rlike("^[IO0]") == True), "FE"))
)

df_planes.groupBy("qa_tailnum").count().distinct().show()


+----------+-----+
|qa_tailnum|count|
+----------+-----+
|         F|  298|
|      null| 2330|
+----------+-----+



### Pergunta 2

In [18]:
df_planes = df_planes.withColumn('qa_year',
                  F.when(F.col('year').isNull(), "M")
                  .when(F.col('year') < 1950, "I")
                  )


df_planes.groupBy("qa_year").count().distinct().show()

+-------+-----+
|qa_year|count|
+-------+-----+
|   null| 2567|
|      M|   60|
|      I|    1|
+-------+-----+



### Pergunta 3

In [19]:
categorias = ["Fixed wing multi engine","Fixed wing single engine","Rotorcraft"]
df_planes = df_planes.withColumn('qa_type',
                  F.when(F.col('type').isNull(), "M")
                  .when(~F.col('type').isin(categorias), "C")
                  )


df_planes.groupBy("qa_type").count().distinct().show()

+-------+-----+
|qa_type|count|
+-------+-----+
|   null| 2628|
+-------+-----+



### Pergunta 4

In [20]:
manufacturer_list = ["AIRBUS", "BOEING","BOMBARDIER", "CESSNA","EMBRAER","SIKORSKY", "CANADAIR",
"PIPER", "MCDONNELL DOUGLAS", "CIRRUS", "BELL", "KILDALL GARY","LAMBERT RICHARD", "BARKER JACK",
"ROBINSON HELICOPTER", "GULFSTREAM", "MARZ BARRY"]



df_planes = df_planes.withColumn("qa_manufacturer",
F.when(F.col('manufacturer').isNull(), "M")
.when(~F.col('manufacturer').isin(manufacturer_list), "C"))



df_planes.groupBy("qa_manufacturer").count().distinct().orderBy("qa_manufacturer", ascending=True).show()

+---------------+-----+
|qa_manufacturer|count|
+---------------+-----+
|           null| 2007|
|              C|  621|
+---------------+-----+



### Pergunta 5

In [21]:
df_planes = df_planes.withColumn('qa_model', (
        F.when(check_empty_column('model'), 'M')
         .when(
             ((F.col('manufacturer').rlike(r'.*AIRBUS.*') & ~F.col('model').startswith('A')) |
              (F.col('manufacturer').rlike(r'.*BOEING.*') & ~F.col('model').startswith('7')) |
              (F.col('manufacturer').rlike(r'.*(BOMBARDIER|CANADAIR).*') & ~F.col('model').startswith('CL')) |
              (F.col('manufacturer').rlike(r'.*MCDONNELL DOUGLAS.*') &  ~(F.col('model').startswith('MD') | F.col('model').startswith('DC')) )), 'F')))


df_planes.groupBy("qa_model").count().distinct().show()

+--------+-----+
|qa_model|count|
+--------+-----+
|       F|   15|
|    null| 2613|
+--------+-----+



### Pergunta 6

In [22]:
df_planes = df_planes.withColumn('qa_engines',
                  F.when(F.col('engines').isNull(), "M")
                  .when((~F.col('engines').between(1,4)), "I")
                  .when(~F.col('engines').rlike(REGEX_INTEGER), "A")  
                  )


df_planes.groupBy("qa_engines").count().distinct().show()
df_planes.show()

+----------+-----+
|qa_engines|count|
+----------+-----+
|      null| 2628|
+----------+-----+

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|qa_year|qa_type|qa_manufacturer|qa_model|qa_engines|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|              C|    null|      null|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|              C|    null|      null|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|           

### Pergunta 7

In [23]:
df_planes = df_planes.withColumn('qa_seats',
                  F.when(F.col('seats').isNull(), "M")
                  .when(~F.col('seats').between(2,500), "I")
                  .when(~F.col('seats').rlike(REGEX_INTEGER), "A")  
                  )
df_planes.groupBy("qa_seats").count().distinct().show()


+--------+-----+
|qa_seats|count|
+--------+-----+
|    null| 2628|
+--------+-----+



### Pergunta 8

In [24]:
df_planes = df_planes.withColumn('qa_speed',
                  F.when(F.col('speed').isNull(), "M")
                  .when(~F.col('speed').between(50,150), "I")
                  .when(~F.col('speed').rlike(REGEX_INTEGER), "A")  
                  )


df_planes.groupBy("qa_speed").count().distinct().show()



df_planes.filter(F.col('qa_speed')=="A").show()


+--------+-----+
|qa_speed|count|
+--------+-----+
|    null|    6|
|       M| 2622|
+--------+-----+

+-------+----+----+------------+-----+-------+-----+-----+------+----------+-------+-------+---------------+--------+----------+--------+--------+
|tailnum|year|type|manufacturer|model|engines|seats|speed|engine|qa_tailnum|qa_year|qa_type|qa_manufacturer|qa_model|qa_engines|qa_seats|qa_speed|
+-------+----+----+------------+-----+-------+-----+-----+------+----------+-------+-------+---------------+--------+----------+--------+--------+
+-------+----+----+------------+-----+-------+-----+-----+------+----------+-------+-------+---------------+--------+----------+--------+--------+



### Pergunta 9

In [25]:
motor_categories = ["Turbo-fan", "Turbo-jet", "Turbo-prop", "Turbo-shaft", "4 Cycle"]

df_planes = df_planes.withColumn('qa_engine',
                  F.when(F.col('engine').isNull(), "M")
                  .when(~F.col('engine').isin(motor_categories), "C"))



df_planes.groupBy("qa_engine").count().distinct().show()
df_planes.show()

+---------+-----+
|qa_engine|count|
+---------+-----+
|     null| 2618|
|        C|   10|
+---------+-----+

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|qa_tailnum|qa_year|qa_type|qa_manufacturer|qa_model|qa_engines|qa_seats|qa_speed|qa_engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+----------+-------+-------+---------------+--------+----------+--------+--------+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|              C|    null|      null|    null|       M|     null|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      null|   null|   null|              C|    null|      null|    n

### Criando planes parquet

In [26]:
planesparq = (df_planes.select("tailnum","qa_tailnum",'qa_year','qa_type','qa_manufacturer','qa_model','qa_engines','qa_seats','qa_speed','qa_engine')
            .repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save('planes_qa.parquet'))
            

# Flights Dataset

### Pergunta 1

In [27]:
df_flights = df_flights.withColumn("qa_year_month_day",
                  F.when(F.col('year').isNull(), "MY")
                  .when(F.col('month').isNull(),"MM")
                  .when(F.col('day').isNull(), "MD")
                  .when(F.col('year') < 1950, "YM")
                  .when(~F.col('month').between(1,12), "IM")
                  .when(~F.col('day').between(1,31) | ((F.col('month') == 2) & (F.col('day') > 29)), "ID")
                  )



df_flights.groupBy("qa_year_month_day").count().distinct().show()

+-----------------+-----+
|qa_year_month_day|count|
+-----------------+-----+
|             null|10000|
+-----------------+-----+



### Pergunta 2

In [28]:
df_flights = df_flights.withColumn("qa_hour_minute",
                  F.when(F.col('hour').isNull(), "MH")
                  .when(F.col('minute').isNull(),"MM")
                  .when(~F.col('hour').between(0,24),"IH")
                  .when(~F.col('minute').between(0,59),"IM")
                  ) 


df_flights.groupBy("qa_hour_minute").count().distinct().show()
df_flights.filter(F.col('qa_hour_minute')=="MH").toPandas()

+--------------+-----+
|qa_hour_minute|count|
+--------------+-----+
|          null| 9952|
|            MH|   48|
+--------------+-----+



Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute,qa_year_month_day,qa_hour_minute
0,2014,3,4,,,,,UA,,156,SEA,DEN,,1024,,,,MH
1,2014,2,12,,,,,AS,N527AS,2,SEA,DCA,,2329,,,,MH
2,2014,7,1,,,,,WN,N8323C,2485,SEA,MDW,,1733,,,,MH
3,2014,4,30,,,,,AS,N526AS,566,PDX,LAX,,834,,,,MH
4,2014,1,3,,,,,US,,553,SEA,PHL,,2378,,,,MH
5,2014,8,7,,,,,AS,N579AS,867,SEA,OGG,,2640,,,,MH
6,2014,8,11,,,,,OO,N689CA,4528,PDX,SEA,,129,,,,MH
7,2014,1,2,,,,,UA,,212,SEA,EWR,,2402,,,,MH
8,2014,5,15,,,,,OO,N917SW,6250,PDX,LAX,,834,,,,MH
9,2014,2,7,,,,,OO,N594SW,5553,PDX,SFO,,550,,,,MH


### Pergunta 3

In [29]:
df_flights = df_flights.withColumn("qa_dep_arr",
                  F.when((F.col('dep_time').isNull())  | (F.col('dep_time') == "NA") , "MD")
                  .when(F.col('arr_time').isNull() | (F.col('arr_time') == "NA"),"MA")
                  .when(~F.col('dep_time').rlike("^([0-9]|1[0-9]|2[0-3])[0-5][0-9]$"), "FD")
                  .when(~F.col('arr_time').rlike("^([0-9]|1[0-9]|2[0-3])[0-5][0-9]$"), "FA")                   
                  )


df_flights.groupBy("qa_dep_arr").count().distinct().show()

+----------+-----+
|qa_dep_arr|count|
+----------+-----+
|      null| 9704|
|        MD|   48|
|        FA|  151|
|        MA|    7|
|        FD|   90|
+----------+-----+



### Pergunta 4

In [30]:
df_flights = df_flights.withColumn("qa_dep_arr_delay",
                  F.when(F.col('dep_delay').isNull(), "MD")
                  .when(F.col('arr_delay').isNull(), "MA"))



df_flights.groupBy("qa_dep_arr_delay").count().distinct().show()



+----------------+-----+
|qa_dep_arr_delay|count|
+----------------+-----+
|            null| 9925|
|              MD|   48|
|              MA|   27|
+----------------+-----+



### Pergunta 5

In [31]:
df_flights = df_flights.withColumn("qa_carrier",
                  F.when(F.col('carrier').isNull(), "M")
                  .when(~F.col('carrier').rlike("^[a-zA-Z0-9_]{2}$"), "F")
                  )


df_flights.groupBy("qa_carrier").count().distinct().show()


+----------+-----+
|qa_carrier|count|
+----------+-----+
|      null|10000|
+----------+-----+



### Pergunta 6

In [32]:
df_flights = (df_flights.withColumn('qa_tailnum',
F.when((F.col('tailnum').isNull()) | (F.col('tailnum') == 'NA') , "M")
.when((F.length(F.col('tailnum')) != 5) & (F.length(F.col('tailnum')) != 6), "S")
.when((F.col('tailnum').rlike("^N") == False), "FN")
.when((F.col('tailnum').rlike("^[IO0]") == True), "FE")
.when((F.col('tailnum').rlike("^N([0-9]{1,4})([A-Z]{1,2}$)") == False), "F"))
)



df_flights.groupBy("qa_tailnum").count().distinct().orderBy("qa_tailnum", ascending=True).show()

+----------+-----+
|qa_tailnum|count|
+----------+-----+
|      null| 8997|
|         F|  987|
|        FN|    2|
|         M|   14|
+----------+-----+



### Pergunta 7

In [33]:
df_flights = df_flights.withColumn('qa_flight',
                  F.when(F.col('flight').isNull(), "M")
                  .when(~F.col('flight').rlike("^[0-9]{4}$"), "F")
                  )


df_flights.groupBy("qa_flight").count().distinct().show()

df_flights.select("flight","qa_flight").filter(F.col("qa_flight") == "F").show()


+---------+-----+
|qa_flight|count|
+---------+-----+
|        F| 6158|
|     null| 3842|
+---------+-----+

+------+---------+
|flight|qa_flight|
+------+---------+
|   851|        F|
|   755|        F|
|   344|        F|
|   522|        F|
|    48|        F|
|   755|        F|
|   490|        F|
|    26|        F|
|   448|        F|
|   656|        F|
|   608|        F|
|   121|        F|
|   306|        F|
|   368|        F|
|   827|        F|
|    24|        F|
|   300|        F|
|   616|        F|
|   306|        F|
|    29|        F|
+------+---------+
only showing top 20 rows



### Pergunta 8

In [34]:
df_flights = df_flights.withColumn('qa_origin_dest',
                  F.when(F.col('origin').isNull(), "MO")
                  .when(F.col('dest').isNull(), "MD")
                  .when(~F.col('origin').rlike("^[a-zA-Z0-9_]{3}$"), "FO")
                  .when(~F.col('dest').rlike("^[a-zA-Z0-9_]{3}$"), "FD")
                  )



df_flights.groupBy("qa_origin_dest").count().distinct().show()


+--------------+-----+
|qa_origin_dest|count|
+--------------+-----+
|          null|10000|
+--------------+-----+



### Pergunta 9

In [35]:
df_flights = df_flights.withColumn("qa_air_time",
                  F.when(F.col('air_time').isNull(), "M")
                  .when(~F.col('air_time').between(20,500), "I")
                  )


df_flights.groupBy("qa_air_time").count().distinct().show()
df_flights.filter(F.col('qa_air_time')=="M").toPandas()

+-----------+-----+
|qa_air_time|count|
+-----------+-----+
|       null| 9925|
|          M|   75|
+-----------+-----+



Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,...,minute,qa_year_month_day,qa_hour_minute,qa_dep_arr,qa_dep_arr_delay,qa_carrier,qa_tailnum,qa_flight,qa_origin_dest,qa_air_time
0,2014,4,6,1329,4.0,2159,,DL,N130DL,1929,...,29.0,,,,MA,,,,,M
1,2014,3,4,,,,,UA,,156,...,,,MH,MD,MD,,M,F,,M
2,2014,2,12,,,,,AS,N527AS,2,...,,,MH,MD,MD,,,F,,M
3,2014,7,1,,,,,WN,N8323C,2485,...,,,MH,MD,MD,,,,,M
4,2014,12,27,1420,40.0,2012,,OO,N224AG,3452,...,20.0,,,,MA,,,,,M
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
70,2014,2,11,840,0.0,1646,,HA,N389HA,21,...,40.0,,,,MA,,,F,,M
71,2014,4,16,2224,-6.0,46,,OO,N234SW,5437,...,24.0,,,FA,MA,,,,,M
72,2014,10,14,,,,,AS,N529AS,413,...,,,MH,MD,MD,,,F,,M
73,2014,2,17,,,,,WN,N449WN,1900,...,,,MH,MD,MD,,,,,M


### Pergunta 10

In [36]:
df_flights = df_flights.withColumn("qa_distance",
                  F.when(F.col('distance').isNull(), "M")
                  .when(~F.col('distance').between(50,3000), "I")
                  )


df_flights.groupBy("qa_distance").count().distinct().show()


+-----------+-----+
|qa_distance|count|
+-----------+-----+
|       null|10000|
+-----------+-----+



### Pergunta 11

In [37]:
df_flights = df_flights.withColumn('qa_distance_airtime', (
        F.when((check_empty_column('distance')) | (check_empty_column('air_time')), 'M')
         .when((F.col('air_time') >= F.col('distance') * 0.1 + 30), 'TL')
         .when((F.col('air_time') <= F.col('distance') * 0.1 + 10), 'TS')
         .otherwise('TR')))




df_flights.groupBy("qa_distance_airtime").count().distinct().show()
df_flights.show(1)

+-------------------+-----+
|qa_distance_airtime|count|
+-------------------+-----+
|                  M|   75|
|                 TR| 4831|
|                 TS|   67|
|                 TL| 5027|
+-------------------+-----+

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------------+--------------+----------+----------------+----------+----------+---------+--------------+-----------+-----------+-------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|qa_year_month_day|qa_hour_minute|qa_dep_arr|qa_dep_arr_delay|qa_carrier|qa_tailnum|qa_flight|qa_origin_dest|qa_air_time|qa_distance|qa_distance_airtime|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------------+--------------+----------+----------------+----------+----------+---------+--------------+----

### Criando Flights parquet 

In [38]:
flightsparq = (df_flights.select("tailnum","dest","origin","qa_tailnum",'qa_year_month_day','qa_hour_minute','qa_dep_arr','qa_dep_arr_delay','qa_carrier','qa_flight','qa_origin_dest','qa_air_time','qa_distance','qa_distance_airtime')
            .repartition(1) # coalesce
            .write.format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save('flights_qa.parquet'))