## Instalação Spark

In [1]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Desafio-de-Qualidade")

In [4]:
spark

<pyspark.sql.session.SparkSession.Builder at 0x1da264c6ee0>

## Importando os CSV

In [3]:
import re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [4]:
#Schema dos datasets

schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True)
])

schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
])



In [5]:
#Criando os Data Frame

df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load("../data/airports.csv"))

df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load("../data/planes.csv"))

df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load("../data/flights.csv"))

In [6]:
df_flights = df_flights[['year', 'month', 'day','hour', 'minute','dep_time','arr_time', 'dep_delay', 'arr_delay', 'carrier', 'tailnum', 'flight',
        'origin','dest', 'air_time', 'distance']]

# <center> Regex Comuns

In [7]:
REGEX_ALPHA    = r'[a-zA-Z]+'
REGEX_INTEGER  = r'[0-9]+'
REGEX_FLOAT    = r'[0-9]+\.[0-9]+'
REGEX_ALPHANUM = r'[0-9a-zA-Z]+'
REGEX_EMPTY_STR= r'[\t ]+$'
REGEX_SPECIAL  = r'[!@#$%&*\(\)_]+'
REGEX_NNUMBER  = r'^N[1-9][0-9]{2,3}([ABCDEFGHJKLMNPRSTUVXWYZ]{1,2})'
REGEX_NNUMBER_INVALID = r'(N0.*$)|(.*[IO].*)'
REGEX_TIME_FMT = r'^(([0-1]?[0-9])|(2[0-3]))([0-5][0-9])$'

# <Center> DataSet Airports

# <center> Pergunta 1

In [12]:
df_airports.filter(df_airports["faa"].rlike("[\w{3,5}]")).show()

+---+--------------------+---------+-----------+----+---+---+
|faa|                name|      lat|        lon| alt| tz|dst|
+---+--------------------+---------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  U|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|
|0W3|Harford County Ai...|39.566837|   -76.2024| 409| -5|  A|
|10C|  Galt Field Airport| 42.40289| -88.375114| 875| -6|  U|
|17G|Port Bucyrus-Craw...|40.781555|  -82.97481|1003| -5|  A|
|19A|Jac

In [13]:
df_airports.filter(df_airports["faa"].rlike("[\w{3,5}]")).count()

1397

In [8]:
df_airports = df_airports.withColumn(
    "qa_faa", 
         (
             F.when(F.col("faa").isNull(), "M")
              .when(F.col("faa").rlike("^(\w{3,5})"), "F")
         )
    )

# <center> Pergunta 2

In [9]:
df_airports = df_airports.withColumn(
    "qa_name", 
         F.when(df_airports["name"].isNull(), "M")
    )

In [19]:
df_airports.show()

+---+--------------------+---------+-----------+----+---+---+------+-------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|
+---+--------------------+---------+-----------+----+---+---+------+-------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|     F|   null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|     F|   null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|     F|   null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|     F|   null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|     F|   null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|     F|   null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|     F|   null|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|     F|   null|
|0P2|Shoestring Aviati...|39.794823| -76.647194|1000| -5|  U|     F|   null|
|0S9|Jefferson County ...| 48.05381|-122.810646| 108| -8|  A|     F|   null|

# <center> Pergunta 3

In [10]:
df_airports = df_airports.withColumn(
    "qa_lat", 
         F.when(df_airports["lat"].isNull(), "M")
          .when(~df_airports["lat"].between(-180,180), "I")
          .when(~df_airports["lat"].rlike("\d"), "A")
    )

In [22]:
df_airports.filter(df_airports.qa_lat.isNotNull()).show()

+---+----+---+---+---+---+---+------+-------+------+
|faa|name|lat|lon|alt| tz|dst|qa_faa|qa_name|qa_lat|
+---+----+---+---+---+---+---+------+-------+------+
+---+----+---+---+---+---+---+------+-------+------+



# <center> Pergunta 4

In [11]:
df_airports = df_airports.withColumn(
    "qa_lon", 
         F.when(F.col("lon").isNull(), "M")
          .when(~F.col("lon").between(-180,180), "I")
          .when(~F.col("lon").rlike("\d"), "A")
    )

In [26]:
df_airports.filter(df_airports.qa_lon.isNotNull()).show()

+---+----+---+---+---+---+---+------+-------+------+------+
|faa|name|lat|lon|alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|
+---+----+---+---+---+---+---+------+-------+------+------+
+---+----+---+---+---+---+---+------+-------+------+------+



In [27]:
df_airports.show()

+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|     F|   null|  null|  null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|     F|   null|  null|  null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|     F|   null|  null|  null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|     F|   null|  null|  null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|     F|   null|  null|  null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|     F|   null|  null|  null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|     F|   null|  null|  null|
|0G7|Finger Lakes Regi...|42.883564| -76.781235| 492| -5|  A|     F|   null|  null|  null|

# <center> Pergunta 5

In [47]:
df_airports = df_airports.withColumn(
    "qa_alt", 
           F.when(F.col("alt").isNull(), "M")
            .when(F.col("alt") < 0, "I")
            .when(~F.col("alt").rlike("\d"), "A")
      )

AnalysisException: cannot resolve 'alt' given input columns: [faa, qa_dst, qa_faa, qa_lat, qa_lon, qa_name, qa_tz];
'Project [faa#0, qa_faa#80, qa_name#89, qa_lat#99, qa_lon#110, qa_tz#122, qa_dst#135, CASE WHEN isnull('alt) THEN M WHEN ('alt < 0) THEN I WHEN NOT RLIKE('alt, \d) THEN A END AS qa_alt#717]
+- Project [faa#0, qa_faa#80, qa_name#89, qa_lat#99, qa_lon#110, qa_tz#122, qa_dst#135]
   +- Project [faa#0, name#1, lat#2, lon#3, alt#4, tz#5, dst#6, qa_faa#80, qa_name#89, qa_lat#99, qa_lon#110, qa_tz#122, CASE WHEN isnull(dst#6) THEN M WHEN RLIKE(dst#6, E, A, S, O, Z, N, U) THEN C WHEN RLIKE(dst#6, \d) THEN N END AS qa_dst#135]
      +- Project [faa#0, name#1, lat#2, lon#3, alt#4, tz#5, dst#6, qa_faa#80, qa_name#89, qa_lat#99, qa_lon#110, CASE WHEN isnull(tz#5) THEN M WHEN NOT ((tz#5 >= -11) AND (tz#5 <= 14)) THEN I WHEN NOT RLIKE(cast(tz#5 as string), \d) THEN A END AS qa_tz#122]
         +- Project [faa#0, name#1, lat#2, lon#3, alt#4, tz#5, dst#6, qa_faa#80, qa_name#89, qa_lat#99, CASE WHEN isnull(lon#3) THEN M WHEN NOT ((lon#3 >= cast(-180 as float)) AND (lon#3 <= cast(180 as float))) THEN I WHEN NOT RLIKE(cast(lon#3 as string), \d) THEN A END AS qa_lon#110]
            +- Project [faa#0, name#1, lat#2, lon#3, alt#4, tz#5, dst#6, qa_faa#80, qa_name#89, CASE WHEN isnull(lat#2) THEN M WHEN NOT ((lat#2 >= cast(-180 as float)) AND (lat#2 <= cast(180 as float))) THEN I WHEN NOT RLIKE(cast(lat#2 as string), \d) THEN A END AS qa_lat#99]
               +- Project [faa#0, name#1, lat#2, lon#3, alt#4, tz#5, dst#6, qa_faa#80, CASE WHEN isnull(name#1) THEN M END AS qa_name#89]
                  +- Project [faa#0, name#1, lat#2, lon#3, alt#4, tz#5, dst#6, CASE WHEN isnull(faa#0) THEN M WHEN RLIKE(faa#0, ^(\w{3,5})) THEN F END AS qa_faa#80]
                     +- Relation [faa#0,name#1,lat#2,lon#3,alt#4,tz#5,dst#6] csv


In [None]:
df_airport.filter(df_airport.qa_alt.isNotNull()).show()

In [None]:
df_airport.show(1400)

# <center> Pergunta 6

In [15]:
df_airports = df_airports.withColumn(
    "qa_tz", 
         F.when(F.col("tz").isNull(), "M")
          .when(~F.col("tz").between(-11,14), "I")
          .when(~F.col("tz").rlike("\d"), "A")
    )

In [35]:
df_airports.show(1400)

+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+-----+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_tz|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+-----+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|     F|   null|  null|  null| null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|     F|   null|  null|  null| null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|     F|   null|  null|  null| null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|     F|   null|  null|  null| null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|     F|   null|  null|  null| null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|     F|   null|  null|  null| null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|     F|   null|  null|  null| null|
|0G7|Finger Lakes Regi...|42.8

# <center> Pergunta 7

In [18]:
df_airports = df_airports.withColumn(
    "qa_dst", 
         F.when(F.col("dst").isNull(), "M")
          .when(F.col("dst").rlike("E, A, S, O, Z, N, U"), "C")
          .when(F.col("dst").rlike("\d"), "N")
    )

In [37]:
df_airports.filter(df_airports.qa_dst.isNotNull()).show()

+---+----+---+---+---+---+---+------+-------+------+------+-----+------+
|faa|name|lat|lon|alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_tz|qa_dst|
+---+----+---+---+---+---+---+------+-------+------+------+-----+------+
+---+----+---+---+---+---+---+------+-------+------+------+-----+------+



In [38]:
df_airports.show()

+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+-----+------+
|faa|                name|      lat|        lon| alt| tz|dst|qa_faa|qa_name|qa_lat|qa_lon|qa_tz|qa_dst|
+---+--------------------+---------+-----------+----+---+---+------+-------+------+------+-----+------+
|04G|   Lansdowne Airport|41.130474|  -80.61958|1044| -5|  A|     F|   null|  null|  null| null|  null|
|06A|Moton Field Munic...| 32.46057|  -85.68003| 264| -5|  A|     F|   null|  null|  null| null|  null|
|06C| Schaumburg Regional| 41.98934|  -88.10124| 801| -6|  A|     F|   null|  null|  null| null|  null|
|06N|     Randall Airport| 41.43191|  -74.39156| 523| -5|  A|     F|   null|  null|  null| null|  null|
|09J|Jekyll Island Air...|31.074472|  -81.42778|  11| -4|  A|     F|   null|  null|  null| null|  null|
|0A9|Elizabethton Muni...|36.371223| -82.173416|1593| -4|  A|     F|   null|  null|  null| null|  null|
|0G6|Williams County A...|41.467304| -84.506775| 730| -5|  A|   

In [19]:
df_airports = df_airports.drop('name', 'lat', 'lon', 'alt', 'tz', 'dst')

In [20]:
df_airports.show()

+---+------+-------+------+------+-----+------+
|faa|qa_faa|qa_name|qa_lat|qa_lon|qa_tz|qa_dst|
+---+------+-------+------+------+-----+------+
|04G|     F|   null|  null|  null| null|  null|
|06A|     F|   null|  null|  null| null|  null|
|06C|     F|   null|  null|  null| null|  null|
|06N|     F|   null|  null|  null| null|  null|
|09J|     F|   null|  null|  null| null|  null|
|0A9|     F|   null|  null|  null| null|  null|
|0G6|     F|   null|  null|  null| null|  null|
|0G7|     F|   null|  null|  null| null|  null|
|0P2|     F|   null|  null|  null| null|  null|
|0S9|     F|   null|  null|  null| null|  null|
|0W3|     F|   null|  null|  null| null|  null|
|10C|     F|   null|  null|  null| null|  null|
|17G|     F|   null|  null|  null| null|  null|
|19A|     F|   null|  null|  null| null|  null|
|1A3|     F|   null|  null|  null| null|  null|
|1B9|     F|   null|  null|  null| null|  null|
|1C9|     F|   null|  null|  null| null|  null|
|1CS|     F|   null|  null|  null| null|

# Salvando o arquivo em parquet

In [41]:
(df_airports
.repartition(1) # coalesce
.write.format("parquet")
.mode('overwrite')
.option("header", "true")
.save("../data/airports_qa.parquet"))

# <center> DATASET PLANES

# <center> Pergunta 1

In [21]:
df_planes = df_planes.withColumn(
    'qa_tailnum', (
        F.when(
            (F.col('tailnum').isNull() | 
             (F.col('tailnum') == '') | 
             F.col('tailnum').rlike(REGEX_EMPTY_STR)), 'M'
        )
        .when(~F.length(F.col('tailnum')).between(5, 6), 'S')
        .when(~F.col('tailnum').startswith('N'), 'FN')
        .when( F.col('tailnum').rlike(REGEX_NNUMBER_INVALID), 'FE')
        .when(~F.col('tailnum').rlike(REGEX_NNUMBER), 'F')
        )
    )

In [22]:
df_planes = df_planes.withColumn(
    "qa_tailnum", 
        F.when(df_planes["tailnum"].isNull(), "M")
         .when(F.length(df_planes.tailnum) != 6, "S")
         .when(~df_planes["tailnum"].rlike("(^N)"), "FN")
         .when(df_planes["tailnum"].rlike("^[I|0|O]"), "FE")
         .when(~df_planes["tailnum"].rlike("^N([0-9]{1,4})([A-Z]{1,2}$)"), "F")
    )

In [50]:
df_planes.select(F.count(df_planes.qa_tailnum)).show()
df_planes.select(F.count(df_planes.qa_tailnum.isNull())).show()
df_planes.count()

+-----------------+
|count(qa_tailnum)|
+-----------------+
|              300|
+-----------------+

+---------------------------+
|count((qa_tailnum IS NULL))|
+---------------------------+
|                       2628|
+---------------------------+



2628

In [51]:
df_planes.show(2630)

+-------+----+--------------------+--------------------+--------------+-------+-----+-----+-------------+----------+
|tailnum|year|                type|        manufacturer|         model|engines|seats|speed|       engine|qa_tailnum|
+-------+----+--------------------+--------------------+--------------+-------+-----+-----+-------------+----------+
| N102UW|1998|Fixed wing multi ...|    AIRBUS INDUSTRIE|      A320-214|      2|  182| null|    Turbo-fan|      null|
| N103US|1999|Fixed wing multi ...|    AIRBUS INDUSTRIE|      A320-214|      2|  182| null|    Turbo-fan|      null|
| N104UW|1999|Fixed wing multi ...|    AIRBUS INDUSTRIE|      A320-214|      2|  182| null|    Turbo-fan|      null|
| N105UW|1999|Fixed wing multi ...|    AIRBUS INDUSTRIE|      A320-214|      2|  182| null|    Turbo-fan|      null|
| N107US|1999|Fixed wing multi ...|    AIRBUS INDUSTRIE|      A320-214|      2|  182| null|    Turbo-fan|      null|
| N108UW|1999|Fixed wing multi ...|    AIRBUS INDUSTRIE|      A3

In [23]:
df_planes = df_planes.withColumn(
    'qa_tailnum',
        F.when((F.col('tailnum').isNull()), "M")
        .when((F.length(F.col('tailnum')) != 5) & (F.length(F.col('tailnum')) != 6), "S") 
        .when((F.col('tailnum').rlike("^N([0-9]{1,4})([A-Z]{1,2}$)") == False), "F")
        .when((F.col('tailnum').rlike("^N") == False), "FN")
        .when((F.col('tailnum').rlike("^[IO0]") == True), "FE")
    )


# <center> Pergunta 2

In [24]:
df_planes = df_planes.withColumn(
    "qa_year",
         F.when((F.col("year").isNull()), "M")
         .when((F.col("year") < 1950), "I")
    )

# <center> Pergunta 3

In [57]:
category = ["Fixed wing multi engine","Fixed wing single engine","Rotorcraft"]
df_planes = df_planes.withColumn(
    "qa_type",
        F.when((F.col('type').isNull()), "M")
        .when(~(F.col('type').isin(category)), "C")
    )

# <center> Pergunta 4

In [59]:
category = ["AIRBUS", "BOEING", "BOMBARDIER", "CESSNA", "EMBRAER",
            "SIKORSKY", "CANADAIR", "PIPER", "MCDONNELL DOUGLAS", "CIRRUS",
            "BELL", "KILDALL GARY", "LAMBERT RICHARD", "BARKER JACK",
            "ROBINSON HELICOPTER", "GULFSTREAM", "MARZ BARRY"]
df_planes = df_planes.withColumn(
    "qa_manufacturer",
        F.when((F.col("manufacturer").isNull()), "M")
        .when(~(F.col("manufacturer").isin(category)), "C")
    )

# <center> Pergunta 5

In [25]:
df_planes = df_planes.withColumn(
    "qa_model",
         F.when((F.col("model").isNull()), "M")
         .when(
             (F.col("manufacturer") == "AIRBUS") & 
             (~F.col("model").rlike("^A")),
             "F")
         .when(
             (F.col("manufacturer") =="BOEING") & 
             (~F.col("model").rlike("^7")),
             "F")
         .when(
             (F.col("manufacturer") == "BOMBARDIER") | 
             (F.col("manufacturer") == "CANADAIR") &
             (~F.col("model").rlike("^CL")),
              "F")
         .when(
             (F.col("manufacturer") == "MCDONNELL DOUGLAS") &
             (~F.col("model").rlike("^(MD|DC)")),
             "F")
     )

In [26]:
df_planes = df_planes.withColumn(
    "qa_model",
         F.when((F.col("model").isNull()), "M")
         .when(
             (F.col("manufacturer") == "AIRBUS") & 
             (~F.col("model").rlike("^A")),
             "F")
         .when(
             (F.col("manufacturer") =="BOEING") & 
             (~F.col("model").rlike("^7")),
             "F")
         .when(
             (F.col("manufacturer") == "BOMBARDIER") | 
             (F.col("manufacturer") == "CANADAIR") &
             (~F.col("model").rlike("^CL")),
             "F")
         .when(
             (F.col("manufacturer") == "MCDONNELL DOUGLAS") &
              ~(
                  F.col("model").rlike("^MD") |
                 (F.col("model").rlike("^DC"))
              ),
             "F")
     )

# <center> Pergunta 6

In [27]:
df_planes = df_planes.withColumn(
    "qa_engines",
        F.when((F.col("engines").isNull()), "M")
        .when(~(F.col("engines").between(1,4)), "I")
        .when((F.col("engines").rlike("\n")), "A")
    )

# <center> Pergunta 7

In [28]:
df_planes = df_planes.withColumn(
    "qa_seats",
        F.when((F.col("seats").isNull()), "M")
        .when(~(F.col("seats").between(2,500)), "I")
        .when((F.col("seats").rlike("\n")), "A")
    )

# <center> Pergunta 8

In [29]:
df_planes = df_planes.withColumn(
    "qa_speed",
        F.when((F.col("speed").isNull()), "M")
        .when(~(F.col("speed").between(50,150)), "I")
        .when(~(F.col("speed").rlike("\w")), "A")
    )

# <center> Pergunta 9

In [30]:
category = ["Turbo-fan","Turbo-jet","Turbo-prop","Turbo-shaft","4 Cycle"]
df_planes = df_planes.withColumn(
    "qa_engine",
        F.when((F.col("engine").isNull()), "M")
        .when(~(F.col("engine").isin(category)), "C")
    )

In [31]:
df_planes_final = df_planes.drop('year','type','manufacturer','model','engines','seats','speed','engine')
df_planes_final.show()

+-------+----------+-------+--------+----------+--------+--------+---------+
|tailnum|qa_tailnum|qa_year|qa_model|qa_engines|qa_seats|qa_speed|qa_engine|
+-------+----------+-------+--------+----------+--------+--------+---------+
| N102UW|      null|   null|    null|      null|    null|       M|     null|
| N103US|      null|   null|    null|      null|    null|       M|     null|
| N104UW|      null|   null|    null|      null|    null|       M|     null|
| N105UW|      null|   null|    null|      null|    null|       M|     null|
| N107US|      null|   null|    null|      null|    null|       M|     null|
| N108UW|      null|   null|    null|      null|    null|       M|     null|
| N109UW|      null|   null|    null|      null|    null|       M|     null|
| N110UW|      null|   null|    null|      null|    null|       M|     null|
| N111US|      null|   null|    null|      null|    null|       M|     null|
| N11206|         F|   null|    null|      null|    null|       M|     null|

In [None]:
(df_planes_final
.repartition(1) # coalesce
.write.format("parquet")
.mode('overwrite')
.option("header", "true")
.save("../data/planes_qa.parquet"))

In [32]:
df_planes_final.groupby('qa_tailnum').count().show()
df_planes_final.groupby('qa_year').count().show()
df_planes_final.groupby('qa_type').count().show()
df_planes_final.groupby('qa_manufacturer').count().show()
df_planes_final.groupby('qa_model').count().show()
df_planes_final.groupby('qa_seats').count().show()
df_planes_final.groupby('qa_speed').count().show()
df_planes_final.groupby('qa_engine').count().show()
df_planes_final.groupby('qa_engines').count().show()

+----------+-----+
|qa_tailnum|count|
+----------+-----+
|         F|  298|
|      null| 2330|
+----------+-----+

+-------+-----+
|qa_year|count|
+-------+-----+
|   null| 2567|
|      M|   60|
|      I|    1|
+-------+-----+



AnalysisException: cannot resolve 'qa_type' given input columns: [qa_engine, qa_engines, qa_model, qa_seats, qa_speed, qa_tailnum, qa_year, tailnum];
'Aggregate ['qa_type], ['qa_type, count(1) AS count#430L]
+- Project [tailnum#14, qa_tailnum#213, qa_year#224, qa_model#249, qa_engines#262, qa_seats#276, qa_speed#291, qa_engine#307]
   +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, qa_year#224, qa_model#249, qa_engines#262, qa_seats#276, qa_speed#291, CASE WHEN isnull(engine#22) THEN M WHEN NOT engine#22 IN (Turbo-fan,Turbo-jet,Turbo-prop,Turbo-shaft,4 Cycle) THEN C END AS qa_engine#307]
      +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, qa_year#224, qa_model#249, qa_engines#262, qa_seats#276, CASE WHEN isnull(speed#21) THEN M WHEN NOT ((speed#21 >= 50) AND (speed#21 <= 150)) THEN I WHEN NOT RLIKE(cast(speed#21 as string), \w) THEN A END AS qa_speed#291]
         +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, qa_year#224, qa_model#249, qa_engines#262, CASE WHEN isnull(seats#20) THEN M WHEN NOT ((seats#20 >= 2) AND (seats#20 <= 500)) THEN I WHEN RLIKE(cast(seats#20 as string), 
) THEN A END AS qa_seats#276]
            +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, qa_year#224, qa_model#249, CASE WHEN isnull(engines#19) THEN M WHEN NOT ((engines#19 >= 1) AND (engines#19 <= 4)) THEN I WHEN RLIKE(cast(engines#19 as string), 
) THEN A END AS qa_engines#262]
               +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, qa_year#224, CASE WHEN isnull(model#18) THEN M WHEN ((manufacturer#17 = AIRBUS) AND NOT RLIKE(model#18, ^A)) THEN F WHEN ((manufacturer#17 = BOEING) AND NOT RLIKE(model#18, ^7)) THEN F WHEN ((manufacturer#17 = BOMBARDIER) OR ((manufacturer#17 = CANADAIR) AND NOT RLIKE(model#18, ^CL))) THEN F WHEN ((manufacturer#17 = MCDONNELL DOUGLAS) AND NOT (RLIKE(model#18, ^MD) OR RLIKE(model#18, ^DC))) THEN F END AS qa_model#249]
                  +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, qa_year#224, CASE WHEN isnull(model#18) THEN M WHEN ((manufacturer#17 = AIRBUS) AND NOT RLIKE(model#18, ^A)) THEN F WHEN ((manufacturer#17 = BOEING) AND NOT RLIKE(model#18, ^7)) THEN F WHEN ((manufacturer#17 = BOMBARDIER) OR ((manufacturer#17 = CANADAIR) AND NOT RLIKE(model#18, ^CL))) THEN F WHEN ((manufacturer#17 = MCDONNELL DOUGLAS) AND NOT RLIKE(model#18, ^(MD|DC))) THEN F END AS qa_model#236]
                     +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, qa_tailnum#213, CASE WHEN isnull(year#15) THEN M WHEN (year#15 < 1950) THEN I END AS qa_year#224]
                        +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, CASE WHEN isnull(tailnum#14) THEN M WHEN (NOT (length(tailnum#14) = 5) AND NOT (length(tailnum#14) = 6)) THEN S WHEN (RLIKE(tailnum#14, ^N([0-9]{1,4})([A-Z]{1,2}$)) = false) THEN F WHEN (RLIKE(tailnum#14, ^N) = false) THEN FN WHEN (RLIKE(tailnum#14, ^[IO0]) = true) THEN FE END AS qa_tailnum#213]
                           +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, CASE WHEN isnull(tailnum#14) THEN M WHEN NOT (length(tailnum#14) = 6) THEN S WHEN NOT RLIKE(tailnum#14, (^N)) THEN FN WHEN RLIKE(tailnum#14, ^[I|0|O]) THEN FE WHEN NOT RLIKE(tailnum#14, ^N([0-9]{1,4})([A-Z]{1,2}$)) THEN F END AS qa_tailnum#202]
                              +- Project [tailnum#14, year#15, type#16, manufacturer#17, model#18, engines#19, seats#20, speed#21, engine#22, CASE WHEN ((isnull(tailnum#14) OR (tailnum#14 = )) OR RLIKE(tailnum#14, [\t ]+$)) THEN M WHEN NOT ((length(tailnum#14) >= 5) AND (length(tailnum#14) <= 6)) THEN S WHEN NOT StartsWith(tailnum#14, N) THEN FN WHEN RLIKE(tailnum#14, (N0.*$)|(.*[IO].*)) THEN FE WHEN NOT RLIKE(tailnum#14, ^N[1-9][0-9]{2,3}([ABCDEFGHJKLMNPRSTUVXWYZ]{1,2})) THEN F END AS qa_tailnum#191]
                                 +- Relation [tailnum#14,year#15,type#16,manufacturer#17,model#18,engines#19,seats#20,speed#21,engine#22] csv


# <center> DataSets Flight

# <center> Pergunta 1

In [33]:
df_flights = df_flights.withColumn(
    "qa_year_month_day",
        F.when((F.col("year").isNull()), "MY")
        .when((F.col("month").isNull()),"MM")
        .when((F.col("day").isNull()), "MD")
        .when((F.col("year") < 1950), "IY")
        .when(
            (F.col("month") < 1) &
            (F.col("month") > 12),
            "IM")
        .when(
            (~F.col("day").between(1,31)) |
            (F.col("month") == 2) & 
            (~F.col("day").between(1,29)),
            "ID")
    )

# <center> Pergunta 2

In [35]:
df_flights = df_flights.withColumn(
    "qa_hour_minute",
        F.when((F.col('hour').isNull()), "MH")
        .when(
            (F.col('hour') < 0) &
            (F.col('hour') > 24),
            "MM")
        .when((F.col('minute').isNull()),"MM")
        .when((~F.col('minute').between(0,59)), "IM")
    )

# <center> Pergunta 3

In [36]:
df_flights = df_flights.withColumn(
    "qa_dep_arr_time",
         F.when((F.col("dep_time").isNull()), "MD")
          .when(
              (
                  F.col("arr_time").isNull() & 
                  F.col("arr_time").rlike('([0-9]{1,2})') == True
              ),
              "MA")
          .when(
              ~(F.length(F.col("dep_time")) == 3) &
               (F.col("dep_time").substr(0,1) <= 9) | 
               (F.length(F.col("dep_time")) == 4) & 
               (F.col("dep_time").substr(0,2) <= 24),
              "FD")
          .when(
              (F.length(F.col("arr_time")) == 3) &
              (F.col("arr_time").substr(0,1) <= 9) | 
              (F.length(F.col("arr_time")) == 4) & 
              (F.col("arr_time").substr(0,2) <= 59),
              "FA")
      )

# <center> Pergunta 4

In [37]:
df_flights = df_flights.withColumn(
    "qa_dep_arr_delay",
        F.when((F.col("dep_delay").isNull()), "MD")
        .when((F.col("arr_delay").isNull()), "MA")
    )

# <center> Pergunta 5

In [38]:
df_flights = df_flights.withColumn(
    "qa_carrier",
         F.when((F.col("carrier").isNull()), "M")
         .when(
             (F.length(F.col("carrier")) != 2) & 
             (F.col("carrier").rlike("\w{1,2}")),
             "F")
     )

# <center> Pergunta 6

In [39]:
df_flights = (df_flights.withColumn(
    'qa_tailnum',
        F.when(
            (F.col('tailnum').isNull()) | 
            (F.col('tailnum') == 'NA'),
            "M")
        .when(
            (F.length(F.col('tailnum')) != 5) & 
            (F.length(F.col('tailnum')) != 6),
            "S")
        .when((F.col('tailnum').rlike("^N") == False), "FN")
        .when((F.col('tailnum').rlike("^[IO0]") == True), "FE")
        .when((F.col('tailnum').rlike("^N([0-9]{1,4})([A-Z]{1,2}$)") == False), "F"))
    )

# <center> Pergunta 7

In [40]:
df_flights = df_flights.withColumn(
    "qa_flight",
         F.when((F.col("flight").isNull()), "M")
         .when(
             (F.length(F.col("flight")) != 4) & 
             (F.col("flight").rlike("\w{1,4}")),
             "F")
     )

# <center> Pergunta 8

In [41]:
df_flights = df_flights.withColumn(
    "origin_dest", 
        F.when((F.col('origin').isNull()), "MO")
        .when((F.col('dest').isNull()), "MD")
        .when((F.col("origin").rlike("\w{3}") == False), "FO")
        .when(~(F.col("dest").rlike("\w{3}")), "FD")
    )

# <center> Pergunta 9

In [42]:
df_flights = df_flights.withColumn(
    "qa_air_time",
          F.when((F.col("air_time").isNull()), "M")
          .when((F.col("air_time").between(20,500) == False), "I")
     )

# <center> Pergunta 10

In [43]:
df_flights = df_flights.withColumn(
    "qa_distance",
        F.when((F.col("distance").isNull()), "M")
        .when((F.col("distance").between(50,3000) == False), "I")
    )

# <center> Pergunta 11

In [44]:
df_flights = df_flights.withColumn(
    "qa_distance_airtime",
        F.when(
            (F.col("distance").isNull()) |
            (F.col("air_time").isNull()),
            "M")
        .when((F.col("air_time")) >= ((F.col("distance") * 0.1) + 30), "TL")
        .when((F.col("air_time")) <= ((F.col("distance") * 0.1) + 10), "TS")
    .otherwise("TR")
    )

In [45]:
df_flights_final = df_flights.drop('year','month','day','dep_time','dep_delay','arr_time','carrier','flight','air_time','distance','hour','minute','arr_delay')

In [86]:
(df_flights_final
.repartition(1) # coalesce
.write.format("parquet")
.mode('overwrite')
.option("header", "true")
.save("../data/flights_qa.parquet"))

In [87]:
df_flights_final.groupBy('qa_year_month_day').count().show()
df_flights_final.groupBy('qa_hour_minute').count().show()
df_flights_final.groupBy('qa_dep_arr_time').count().show()
df_flights_final.groupBy('qa_dep_arr_delay').count().show()
df_flights_final.groupBy('qa_carrier').count().show()
df_flights_final.groupBy('qa_tailnum').count().show()
df_flights_final.groupBy('qa_flight').count().show()
df_flights_final.groupBy('origin_dest').count().show()
df_flights_final.groupBy('qa_air_time').count().show()
df_flights_final.groupBy('qa_distance').count().show()
df_flights_final.groupBy('qa_distance_airtime').count().show()

+-----------------+-----+
|qa_year_month_day|count|
+-----------------+-----+
|             null|10000|
+-----------------+-----+

+--------------+-----+
|qa_hour_minute|count|
+--------------+-----+
|          null| 9952|
|            MH|   48|
+--------------+-----+

+---------------+-----+
|qa_dep_arr_time|count|
+---------------+-----+
|           null|   50|
|             FA| 3173|
|             FD| 6777|
+---------------+-----+

+----------------+-----+
|qa_dep_arr_delay|count|
+----------------+-----+
|            null| 9925|
|              MD|   48|
|              MA|   27|
+----------------+-----+

+----------+-----+
|qa_carrier|count|
+----------+-----+
|      null|10000|
+----------+-----+

+----------+-----+
|qa_tailnum|count|
+----------+-----+
|         F|  987|
|      null| 8997|
|         M|   14|
|        FN|    2|
+----------+-----+

+---------+-----+
|qa_flight|count|
+---------+-----+
|        F| 6158|
|     null| 3842|
+---------+-----+

+-----------+-----+
|origin