In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark

In [2]:
# starting spark
import findspark
findspark.init()

In [6]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, lit, udf, length, substring, expr, regexp_replace, sum_distinct
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as F

In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("Aceleração PySpark - Capgemini"))

In [7]:
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True)
])

schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
])

In [9]:
df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load("../Datasets/airports.csv"))

df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load("../Datasets/planes.csv"))

df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load("../Datasets/flights.csv"))

In [10]:
# Expressoes regulares comuns
REGEX_ALPHA    = r'[a-zA-Z]+'
REGEX_INTEGER  = r'[0-9]+'
REGEX_FLOAT    = r'[0-9]+\.[0-9]+'
REGEX_ALPHANUM = r'[0-9a-zA-Z]+'
REGEX_EMPTY_STR= r'[\t ]+$'
REGEX_SPECIAL  = r'[!@#$%&*\(\)_]+'
REGEX_NNUMBER  = r'^N[1-9][0-9]{2,3}([ABCDEFGHJKLMNOPQRSTUVXWYZ]{1,2})'
REGEX_NNUMBER_INVALID = r'(N0.*$)|(.*[IO].*)'
REGEX_TIME_FMT = r'^(([0-1]?[0-9])|(2[0-3]))([0-5][0-9])$'

In [13]:
# Funcoes auxiiliares
def split_csv(line):
    return tuple(map(lambda x: x.replace('"',''), line.split(",")))

def check_empty_column(col):
    return (F.col(col).isNull() | (F.col(col) == '') | F.col(col).rlike(REGEX_EMPTY_STR))

# Airports

# Pergunta 1

In [None]:
df1 = df_airports.withColumn(
    'qa_faa', 
    when(
            (
                (col('faa').isNull()) |
                (col('faa') == "NA")
            ), "M"
        )
    .when(~F.col("faa").rlike("^(?=.*[0-9])(?=.*[a-zA-Z])(?=[A-Z0-9]).{3,5}$"), "F")
)

# Teste do resultado

In [None]:
df1.groupBy('qa_faa').count().show()

In [None]:
df1.select('faa', 'qa_faa').where(col('qa_faa')=='F').show()

# Pergunta 2

In [None]:
df2 = df1.withColumn(
    'qa_name', 
    when(
            (
                (col('name').isNull()) |
                (col('name') == "NA")
            ), "M"
        )
)

# Teste do resultado

In [None]:
df2.groupBy('qa_name').count().show()

In [None]:
df2.select('name', 'qa_name').where(col('qa_name')=='M').show()

# Pergunta 3

In [None]:
df3 = df2.withColumn(
    "qa_lat",
    when(~col("lat").between(-180, 180), "I")
    .when(
            (
                (col('lat').isNull()) |
                (col('lat') == "NA")
            ), "M"
        )
    .when(col("lat").rlike(REGEX_ALPHA), "A"))

# Teste do resultado

In [None]:
df3.groupBy('qa_lat').count().show()

# Pergunta 4

In [None]:
df4 = df3.withColumn(
    "qa_lon",
    when(~col("lon").between(-180, 180), "I")
    .when(
            (
                (col('lon').isNull()) |
                (col('lon') == "NA")
            ), "M"
        )
    .when(col("lon").rlike(REGEX_ALPHA), "A"))

# Teste do resultado

In [None]:
df4.groupBy('qa_lon').count().show()

# Pergunta 5

In [None]:
df5 = df4.withColumn(
    "qa_alt",
    when(col("alt") < 0, "I")
    .when(
            (
                (col('alt').isNull()) |
                (col('alt') == "NA")
            ), "M"
          )
    .when(~col("alt").rlike("[0-9]+"), "A")
)

# Teste do resultado

In [None]:
df5.groupBy(col('qa_alt')).count().show()

# Pergunta 6

In [None]:
df6 = df5.withColumn(
    "qa_tz",
    when(
            (
                (col('tz').isNull()) |
                (col('tz') == "NA")
            ), "M"
        )
    .when(~col('tz').between(-11, 14), "I")
    .when(~F.col('tz').rlike(REGEX_INTEGER + '$'), 'A')
)


# Teste do resultado

In [None]:
df6.groupBy(col('qa_tz')).count().show()

# Pergunta 7

In [None]:
DST_CATEGORIES = ['E', 'A', 'S', 'O', 'Z', 'N', 'U']

df7 = df6.withColumn(
    "qa_dst",
    when(
            (
                (col('dst').isNull()) |
                (col('dst') == "NA")
            ), "M"
        )
             .when(F.col('dst').rlike(REGEX_INTEGER), 'N')
             .when(~F.col('dst').isin(DST_CATEGORIES), 'C')
)

# Teste do resultado

In [None]:
df7.groupBy(col('qa_dst')).count().show()

# Salvando o arquivo em parquet com as colunas qa e informações importantes

In [None]:
df7.select(
    col('faa'), 
    col('name'), 
    col('qa_faa'), 
    col('qa_name'), 
    col('qa_lat'), 
    col('qa_lon'), 
    col('qa_alt'), 
    col('qa_tz'), 
    col('qa_dst')
).write.parquet(
    path = 'C:/Users/coskata/Downloads/Datasets/parquet/airports.parquet',
    mode = 'overwrite'
)

# Salvando o arquivo em parquet

In [None]:
df7.write.parquet(
path = 'C:/Users/coskata/Downloads/Datasets/parquet/airports.parquet',
mode = 'overwrite'
)

# Teste do resultado

In [None]:
path = 'C:/Users/coskata/Downloads/Datasets/parquet/airports.parquet'
airports_parquet = spark.read.parquet(path)

In [None]:
airports_parquet.toPandas()

# Planes

# Pergunta 1

In [None]:
df1 = df_planes.withColumn('qa_tailnum', (
        F.when(check_empty_column('tailnum'), 'M')
         .when(~F.length(F.col('tailnum')).between(5, 6), 'S')
         .when(~F.col('tailnum').startswith('N'), 'FN')
         .when( F.col('tailnum').rlike(REGEX_NNUMBER_INVALID), 'FE')
         .when(~F.col('tailnum').rlike(REGEX_NNUMBER), 'F')))

# Teste do resultado

In [None]:
df1.groupBy('qa_tailnum').count().show()

# Pergunta 2

In [None]:
df2 = df1.withColumn(
    "qa_year",
    when(
            (
                (col('year').isNull()) |
                (col('year') == "NA")
            ), "M"
        )
    .when(col('year') < 1950, "I")
)

# Teste do resultado

In [None]:
df2.groupBy('qa_year').count().show()

# Pergunta 3

In [None]:
df3 = df2.withColumn(
    "qa_type", 
    when(
            (
                (col('type').isNull()) |
                (col('type') == "NA")
            ), "M"
        )
    .when(~col("type").rlike("[Fixed wing multi engine,\
                               Fixed wing single engine,\
                               Rotorcraft]"), "C")
)

# Teste do resultado

In [15]:
df3.groupBy('qa_type').count().show()

NameError: name 'df3' is not defined

# Pergunta 4

In [None]:
df4 = df3.withColumn(
    "qa_manufacturer",
    when(
            (
                (col('manufacturer').isNull()) |
                (col('manufacturer') == "NA")
            ), "M"
        )
    .when(~col("manufacturer").rlike("[AIRBUS,\
                                        BOEING,\
                                        BOMBARDIER,\
                                        CESSNA,\
                                        EMBRAER,\
                                        SIKORSKY,\
                                        CANADAIR,\
                                        PIPER,\
                                        MCDONNELL DOUGLAS,\
                                        CIRRUS,\
                                        BELL,\
                                        KILDALL GARY,\
                                        LAMBERT RICHARD,\
                                        BARKER JACK,\
                                        ROBINSON HELICOPTER,\
                                        GULFSTREAM,\
                                        MARZ BARRY]"), "C"))

# Teste do resultado

In [None]:
df4.groupBy('qa_manufacturer').count().show()

# Pergunta 5

In [None]:
df5 = df4.withColumn(
    "qa_model", 
    when(
            (
                (col('model').isNull()) |
                (col('model') == "NA")
            ), "M"
        )
    .when(
            (
                (col('manufacturer') == "AIRBUS") & (~col('model').startswith("A")) |
                (col('manufacturer') == "BOEING") & (~col('model').startswith("7")) |
                (col('manufacturer') == "BOMBARDIER") & (~col('model').startswith("CL")) |
                (col('manufacturer') == "MCDONNELL DOUGLAS") & (~col('model').startswith("MD") | col('model').startswith("DC"))
            ), "F"
        )
)

# Teste do resultado

In [None]:
df5.groupBy('qa_model').count().show()

# Pergunta 6

In [None]:
df6 = df5.withColumn(
    "qa_engines",
    when(
            (
                (col('engines').isNull()) |
                (col('engines') == "NA")
            ), "M"
        )
    .when(~col('engines').between(1,4), "I")
    .when(col('engines').rlike("^[0-9A-Z^]*$"), "A")
)

# Teste do resultado

In [None]:
df6.groupBy('qa_engines').count().show()

# Pergunta 7

In [None]:
df7 = df6.withColumn(
    "qa_seats",
    when(
            (
                (col('seats').isNull()) |
                (col('seats') == "NA")
            ), "M"
        )
    .when(~col('seats').between(2,500), "I")
    .when(col('seats').rlike("^[0-9A-Z^]*$"), "A")
)

# Teste do resultado

In [None]:
df7.groupBy('qa_seats').count().show()

# Pergunta 8

In [None]:
df8 = df7.withColumn(
    "qa_speed",
    when(
            (
                (col('speed').isNull()) |
                (col('speed') == "NA")
            ), "M"
        )
    .when(~col('speed').between(50,150), "I")
    .when(col('speed').rlike("^[0-9A-Z^]*$"), "A")
)

# Teste do resultado

In [None]:
df8.groupBy('qa_speed').count().show()

# Pergunta 9

In [None]:
df9 = df8.withColumn(
    "qa_engine",
    when(
            (
                (col('engine').isNull()) |
                (col('engine') == "NA")
            ), "M"
        )
    .when(
            (
                (~col('engine').rlike("Turbo-fan")) &
                (~col('engine').rlike("Turbo-jet")) &
                (~col('engine').rlike("Turbo-prop")) &
                (~col('engine').rlike("Turbo-shaft")) &
                (~col('engine').rlike("4 CycleY"))
            ), "C"
         )
)

# Teste do resultado

In [None]:
df9.groupBy('qa_engine').count().show()

# Salvando o arquivo em parquet com as colunas qa e informações importantes

In [None]:
df9.select(
    col('tailnum'), 
    col('qa_tailnum'),
    col('qa_year'), 
    col('qa_type'), 
    col('qa_manufacturer'), 
    col('qa_model'), 
    col('qa_engines'), 
    col('qa_seats'), 
    col('qa_speed'), 
    col('qa_engine')

).write.parquet(
    path = 'C:/Users/coskata/Downloads/Datasets/parquet/planes.parquet',
    mode = 'overwrite'
)

# Salvando o arquivo em parquet

In [None]:
df9.write.parquet(
path = 'C:/Users/coskata/Downloads/Datasets/parquet/planes.parquet',
mode = 'overwrite'
)

# Teste do resultado

In [None]:
path = 'C:/Users/coskata/Downloads/Datasets/parquet/planes.parquet'
planes_parquet = spark.read.parquet(path)

In [None]:
planes_parquet.toPandas()

# Airports

# Pergunta 1

In [None]:
df1 = df_airports.withColumn(
    "qa_year_month_day", 
    when(
            (
                (col('year').isNull()) |
                (col('year') == "NA")
            ), "MY"
        )
    .when(
            (
                (col('month').isNull()) |
                (col('month') == "NA")
            ), "MM"
        )
    .when(
            (
                (col('day').isNull()) |
                (col('day') == "NA")
            ), "MD"
        )
    .when(col("year") < 1950, "IY")
    .when(~col("month").between(1,12), "IM")
    .when(
            (~col('day').between(1,31)) | 
            (col("month") == 2) & 
            (~col('day').between(1,29)), "ID"
        )
   )

# Teste do resultado

In [None]:
df1.groupBy('qa_year_month_day').count().show()

# Pergunta 2

In [None]:
df2 = df1.withColumn(
    "qa_hour_minute",
    when(
            (
                (col('hour').isNull()) |
                (col('hour') == "NA")
            ), "MH"
        )
    .when(
            (
                (col('minute').isNull()) |
                (col('minute') == "NA")
            ), "MM"
        )
    .when(~col('hour').between(0,24), "IH")
    .when(~col('minute').between(0, 59), "IM")
)

# Teste do resultado

In [None]:
df2.groupBy('qa_hour_minute').count().show()

# Pergunta 3

In [None]:
# Usando dataframe
df3 = df2.withColumn('qa_dep_arr_time', (
        F.when(check_empty_column('dep_time'), 'MD')
         .when(check_empty_column('arr_time'), 'MA')
         .when(F.col('dep_time').rlike(REGEX_TIME_FMT), 'FD')
         .when(F.col('arr_time').rlike(REGEX_TIME_FMT), 'FA')))

# Teste do resultado

In [None]:
df3.groupBy('qa_dep_arr_time').count().show()

# Pergunta 4

In [None]:
df4 = df3.withColumn(
    "qa_dep_arr_delay",
    when(
            (
                (col('dep_delay').isNull()) |
                (col('dep_delay') == "NA")
            ), "MD"
        )
    .when(
            (
                (col('arr_delay').isNull()) |
                (col('arr_delay') == "NA")
            ), "MA"
        )
)

# Teste do resultado

In [None]:
df4.groupBy('qa_dep_arr_delay').count().show()

# Pergunta 5

In [None]:
df5 = df4.withColumn(
    "qa_carrier", 
    when(
            (
                (col('carrier').isNull()) |
                (col('carrier') == "NA")
            ), "M"
        )
    .when(~col('carrier').rlike("{2}(^[0-9][A-Z]|^[A-Z][0-9])$"), "F")
)

# Teste do resultado

In [None]:
df5.groupBy('qa_carrier').count().show()

# Pergunta 6

In [None]:
df6 = df5.withColumn('qa_tailnum', (
        F.when(check_empty_column('tailnum'), 'M')
         .when(~F.length(F.col('tailnum')).between(5, 6), 'S')
         .when(~F.col('tailnum').startswith('N'), 'FN')
         .when( F.col('tailnum').rlike(REGEX_NNUMBER_INVALID), 'FE')
         .when(~F.col('tailnum').rlike(REGEX_NNUMBER), 'F')))

# Teste do resultado

In [None]:
df6.groupBy('qa_tailnum').count().show()

# Pergunta 7

In [None]:
df7 = df6.withColumn(
    "qa_flight", 
    when(
            (
                (col('flight').isNull()) |
                (col('flight') == "NA")
            ), "M"
        )
    .when(~col("flight").rlike("[0-9]{4}$"), "F")
)

# Teste do resultado

In [None]:
df7.groupBy(col('qa_flight')).count().show()

# Pergunta 8

In [None]:
df8 = df7.withColumn(
    "qa_origin_dest", 
    when(
            (
                (col('origin').isNull()) |
                (col('origin') == "NA")
            ), "MO"
        )
    .when(
            (
                (col('dest').isNull()) |
                (col('dest') == "NA")
            ), "MD"
        )
    .when(
            (
                (col('origin').rlike("^[A-Z]*")) |
                (col('origin').rlike("^[0-9]*"))
            ), "FO"
        )
    .when(
            (
                (col('dest').rlike("^[A-Z]*")) |
                (col('dest').rlike("^[0-9]*"))
            ), "FD"
        )
)

# Teste do resultado

In [None]:
df8.groupBy('qa_origin_dest').count().show()

# Pergunta 9

In [None]:
df9 = df8.withColumn(
    "qa_air_time", 
    when(
            (
                (col('air_time').isNull()) |
                (col('air_time') == "NA")
            ), "M"
        )
    .when(~col("air_time").between(20,500), "I")
)

# Teste do resultado

In [None]:
df9.groupBy('qa_air_time').count().show()

# Pergunta 10

In [None]:
df10 = df9.withColumn(
    "qa_distance", 
    when(
            (
                (col('distance').isNull()) |
                (col('distance') == "NA")
            ), "M"
        )
    .when(~col("distance").between(50,3000), "I")
)

# Teste do resultado

In [None]:
df10.groupBy('qa_distance').count().show()

# Pergunta 11

In [None]:
df11 = df10.withColumn('qa_distance_airtime', (
        F.when(check_empty_column('distance'), 'M')
         .when((F.col('air_time') >= F.col('distance') * 0.1 + 30), 'TL')
         .when((F.col('air_time') <= F.col('distance') * 0.1 + 10), 'TS')
         .otherwise('TR')))

# Teste do resultado

In [None]:
df11.groupBy('qa_distance_airtime').count().show()

# Salvando o arquivo em parquet com as colunas qa e informações importantes

In [None]:
df11.select(
    col('tailnum'), 
    col('flight'), 
    col('origin'), 
    col('dest'), 
    col('distance'), 
    col('qa_year_month_day'), 
    col('qa_hour_minute'), 
    col('qa_dep_arr_time'), 
    col('qa_dep_arr_delay'),
    col('qa_carrier'), 
    col('qa_tailnum'), 
    col('qa_flight'), 
    col('qa_origin_dest'), 
    col('qa_air_time'), 
    col('qa_distance_airtime')
).write.parquet(
    path = 'C:/Users/coskata/Downloads/Datasets/parquet/flights.parquet',
    mode = 'overwrite'
)

# Salvando o arquivo em parquet

In [None]:
df11.write.parquet(
path = 'C:/Users/coskata/Downloads/Datasets/parquet/flights.parquet',
mode = 'overwrite'
)

# Teste do resultado

In [None]:
path = 'C:/Users/coskata/Downloads/Datasets/parquet/flights.parquet'
flights_parquet = spark.read.parquet(path)

In [None]:
flights_parquet.toPandas()