In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark



In [2]:
# Importa e inicia o findspark
import findspark
findspark.init()

In [3]:
# Importa todos os módulos necessários
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [4]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]") # usa a márquina local com 7 nucleos
                     .appName("Aceleração PySpark - Capgemini"))

In [5]:
# Cria o Schema de todos os datasets
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField('tailnum', StringType(), True),
    StructField('year', IntegerType(), True),
    StructField('type', StringType(), True),
    StructField('manufacturer', StringType(), True),
    StructField('model', StringType(), True),
    StructField('engines', IntegerType(), True),
    StructField('seats', IntegerType(), True),
    StructField('speed', IntegerType(), True),
    StructField('engine', StringType(), True)
])

schema_flights = StructType([
    StructField('year', IntegerType(), True),
    StructField('month', IntegerType(), True),
    StructField('day', IntegerType(), True),
    StructField('dep_time', StringType(), True),
    StructField('dep_delay', IntegerType(), True),
    StructField('arr_time', StringType(), True),
    StructField('arr_delay', IntegerType(), True),
    StructField('carrier', StringType(), True),
    StructField('tailnum', StringType(), True),
    StructField('flight', StringType(), True),
    StructField('origin', StringType(), True),
    StructField('dest', StringType(), True),
    StructField('air_time', IntegerType(), True),
    StructField('distance', IntegerType(), True),
    StructField('hour', IntegerType(), True),
    StructField('minute', IntegerType(), True)
])

In [6]:
# Cria os dataframes
df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load("../data/airports.csv"))

df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load("../data/planes.csv"))

df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load("../data/flights.csv"))

In [7]:
df_airports.show(3)
df_planes.show(3)
df_flights.show(3)

+---+--------------------+---------+---------+----+---+---+
|faa|                name|      lat|      lon| alt| tz|dst|
+---+--------------------+---------+---------+----+---+---+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|
+---+--------------------+---------+---------+----+---+---+
only showing top 3 rows

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
+--

In [None]:
# Função de regex
def regex_definition(element):
    return r'|'.join(map(lambda x : f'.*{x}*', element))

# -----------------------------------------------------------------------------------------------------------
# Transformação airports
# -----------------------------------------------------------------------------------------------------------
## 1.

In [None]:
df_airports.filter(F.col('alt') < 0).show()

In [None]:
df_airports = df_airports.withColumn('alt', 
                                     F.when(F.col('alt') < 0, 0)
                                      .otherwise(F.col('alt'))
                                    )

In [None]:
df_airports.filter(F.col('alt') == 0).count()

In [None]:
df_airports.groupBy(F.col('alt')).count().orderBy(F.col('alt')).show()

# -----------------------------------------------------------------------------------------------------------
## 2.

In [None]:
df_airports.filter(F.col('tz').between(-7, -5)).show(10)

In [None]:
df_airports = df_airports.withColumn('dst',
                                     F.when(F.col('tz').between(-7, -5), 'A')
                                      .otherwise(F.col('dst'))
                                    )

In [None]:
df_airports.filter(F.col('tz').between(-7, -5)).show(10)
df_airports.groupBy("dst").count().show()

In [None]:
df_airports.groupBy(F.col('dst')).count().show(999)

# -----------------------------------------------------------------------------------------------------------
## 3.

In [None]:
df_airports.filter(F.col('dst') == 'U').show()

In [None]:
df_airports = df_airports.withColumn('dst',
                                     F.when(F.col('dst') == 'U', 'A')
                                      .otherwise(F.col('dst'))
                                    )

In [None]:
df_airports.filter(F.col('faa') == 'GCW').show()
df_airports.filter(F.col('dst') == 'A').count()
df_airports.groupBy("dst").count().show()

# -----------------------------------------------------------------------------------------------------------
## 4.

In [None]:
# Alaska
df_airports.filter(F.col('lon') < -124).show(5)

# Mainland-west
df_airports.filter(F.col('lon').between(-124, -95)).show(5)

# Mainland-east
df_airports.filter(F.col('lon').between(-95, -59)).show(5)

# Offshore
df_airports.filter((F.col('lon') > -50) | (F.col('lat') < 24)).show(5)

In [None]:
df_airports = df_airports.withColumn('region',
                                      F.when(F.col('lon') < -124,                        'ALASKA')
                                       .when(F.col('lon').between(-124, -95),            'MAINLAND-WEST')
                                       .when(F.col('lon').between(-95, -54),             'MAINLAND-EAST')
                                       .when((F.col('lon') > -50) | (F.col('lat') > 24), 'OFFSHORE')
                                       .otherwise('NaN')
                                    )

df_airports.show(10)

In [None]:
df_airports.groupBy("region").count().show()

# -----------------------------------------------------------------------------------------------------------
## 5.

In [None]:
LIST_AP  = ['Airport', 'Tradeport', 'Heliport', 'Airpor', 'Arpt']
REGEX_AP = regex_definition(LIST_AP)

LIST_AD  = ['Aerodrome']
REGEX_AD = regex_definition(LIST_AD)

LIST_AK  = ['Airpark', 'Aero Park']
REGEX_AK = regex_definition(LIST_AK)

LIST_AS  = ['Station', 'Air Station']
REGEX_AS = regex_definition(LIST_AS)

LIST_FL  = ['Field', 'Fld']
REGEX_FL = regex_definition(LIST_FL)

In [None]:
df_airports = df_airports.withColumn('type',
                                     F.when(F.col('name').rlike(REGEX_AP),  'AP')
                                       .when(F.col('name').rlike(REGEX_AD), 'AD')
                                       .when(F.col('name').rlike(REGEX_AK), 'AK')
                                       .when(F.col('name').rlike(REGEX_AS), 'AS')
                                       .when(F.col('name').rlike(REGEX_FL), 'FL')
                                       .otherwise('NaN')
                                    )

In [None]:
df_airports.filter((F.col('name').rlike('(Airport).*(Field)'))).show(10, truncate=False)
df_airports.groupBy("type").count().show()

# -----------------------------------------------------------------------------------------------------------
## 6.

In [None]:
df_airports.filter(F.col('name').rlike('Afs')).show(10, False)

In [None]:
LIST_MILITARY  = ['Base', 'Aaf', 'Afs', 'Ahp', 'Afb', 'LRRS', 'Lrrs', 'Arb', 'Naf', 'NAS', 'Nas', 'Jrb', 'Ns', 'As', 'Cgas', 'Angb']
REGEX_MILITARY = r'|'.join(map(lambda x : f'^{x} | {x} | {x}$', LIST_MILITARY))

In [None]:
df_airports = df_airports.withColumn('military',
                                     F.when(F.col('name').rlike(REGEX_MILITARY), True)
                                      .otherwise(False)
                                    )

In [None]:
df_airports.filter(F.col('military') == True).show(10, False)
df_airports.filter(F.col('name').rlike('As')).show(50, False)
df_airports.groupBy("military").count().show()

# -----------------------------------------------------------------------------------------------------------
## 7.

In [None]:
df_airports.filter(F.col('name').rlike('(City).*(Intl)')).show(10, False)

In [None]:
LIST_I  = ['International', 'Intl', 'Intercontinental']
REGEX_I = regex_definition(LIST_I)

LIST_N  = ['National', 'Natl']
REGEX_N = regex_definition(LIST_N)

LIST_R  = ['Regional', 'Reigonal', 'Rgnl', 'County', 'Metro', 'Metropolitan']
REGEX_R = regex_definition(LIST_R)

LIST_M  = ['Municipal', 'Muni', 'City']
REGEX_M = regex_definition(LIST_M)

In [None]:
df_airports = df_airports.withColumn('administration',
                                     F.when(F.col('name').rlike(REGEX_I), 'I')
                                      .when(F.col('name').rlike(REGEX_N), 'N')
                                      .when(F.col('name').rlike(REGEX_R), 'R')
                                      .when(F.col('name').rlike(REGEX_M), 'M')
                                      .otherwise('NaN')
                                    )

df_airports.show(10, False)

In [None]:
df_airports.filter(F.col('administration') == 'I').show(10, False)
df_airports.groupBy("administration").count().show()

# -----------------------------------------------------------------------------------------------------------
# Transformação planes
# -----------------------------------------------------------------------------------------------------------
## 1.

In [None]:
df_planes = df_planes.withColumn('tailchar', F.regexp_replace(F.col('tailnum'), '[0-9]|^N', ""))

df_planes.show()

In [None]:
df_planes.groupBy("tailchar").count().show()

# -----------------------------------------------------------------------------------------------------------
## 2.

In [None]:
df_planes.filter(F.col('year') == 0).show()

In [None]:
df_planes = df_planes.withColumn('year',
                                 F.when(F.col('year') == 0, 1996)
                                  .otherwise(F.col('year')))
 
df_planes.filter(F.col('year') == 0).show()
df_planes.groupBy("year").count().show()

# -----------------------------------------------------------------------------------------------------------
## 3.

In [None]:
df_planes.filter(F.col('year').isNull()).show(10)
df_planes.filter(F.col('year').isNull()).count()

In [None]:
df_planes_order = df_planes.orderBy(F.col('manufacturer'), F.col('model'), F.col('year')).filter(F.col('year').isNotNull())
df_planes_order.show(10)

In [None]:
df_planes.filter(F.col('manufacturer') == 'LAMBERT RICHARD').show()

In [None]:
# Reduz o dataframe
df_yeara = df_planes_order.groupBy("manufacturer", "model").min("year").orderBy(F.col('manufacturer'), F.col('model'))
df_yeara.show()

In [None]:
# Reduz ainda mais
df_yearb = df_yeara.groupBy('manufacturer').min('min(year)').orderBy(F.col('manufacturer'))
df_yearb.show()

In [None]:
# Renomeando as colunar do dataframe year_a e de year_b
df_yeara = (df_yeara.withColumnRenamed('manufacturer', 'manuf_a')
                    .withColumnRenamed('model', 'model_a')
                    .withColumnRenamed('min(year)', 'year_a'))

df_yearb = (df_yearb.withColumnRenamed('manufacturer', 'manuf_b')
                    .withColumnRenamed('min(min(year))', 'year_b'))

In [None]:
# Unindo os dataframe df_planes + df_yeara + df_yearb 
df_join = df_planes.join(df_yeara, 
                        (df_planes['manufacturer'] == df_yeara['manuf_a']) &
                        (df_planes['model'] == df_yeara['model_a']),
                        'left')

df_join = df_join.join(df_yearb,
                       df_join['manufacturer'] == df_yearb['manuf_b'],
                       'left')

In [None]:
# Verificando se a união ocorreu corretamente
df_join.show(3, 7)
df_join.filter(F.col('year').isNull()).show(20, 7)

In [None]:
# aplicando as mudanças na coluna year
df_join = df_join.withColumn('year',
                              F.when((F.col('year').isNull()) &
                                     (F.col('manuf_a').isNotNull()) &
                                     (F.col('model_a').isNotNull()), F.col('year_a'))
                               .when((F.col('year').isNull()) &
                                     (F.col('manuf_b').isNotNull()), F.col('year_b'))
                               .otherwise(F.col('year'))
                            )

df_join.filter(F.col('year').isNull()).show(truncate = 7)

In [None]:
df_planes = df_join.drop('manuf_a', 
                         'model_a', 
                         'year_a', 
                         'manuf_b',
                         'year_b')

In [None]:
df_planes.filter(F.col('year').isNull()).show()

# -----------------------------------------------------------------------------------------------------------
## 4.

In [None]:
ANO_ATUAL = 2022

df_planes = df_planes.withColumn('age',
                                 F.when(F.col('year').isNotNull(), ANO_ATUAL - F.col('year'))
                                  .otherwise(None)
                                ) 

df_planes.show()

In [None]:
df_planes.groupBy("age").count().orderBy(F.col('age')).show()

# -----------------------------------------------------------------------------------------------------------
## 5.

In [None]:
df_planes = df_planes.withColumn('type',
                                 F.when(F.col('type') == 'Fixed wing multi engine',  'MULTI_ENG')
                                  .when(F.col('type') == 'Fixed wing single engine', 'SINGLE_ENG')
                                  .when(F.col('type') == 'Rotorcraft',               'ROTORCRAFT')
                                  .otherwise(F.col('type'))
                                )

df_planes.show()

In [None]:
df_planes.groupBy("type").count().orderBy(F.col('type')).show(999)

# -----------------------------------------------------------------------------------------------------------
## 6.

In [None]:
df_planes.filter(F.col('manufacturer').rlike('BELL')).show(truncate=False)

In [None]:
# Realiza as mudanças
df_planes = df_planes.withColumn('manufacturer',
            F.when(F.col('manufacturer').rlike('AIRBUS'),              'AIRBUS')
             .when(F.col('manufacturer').rlike('BOEING'),              'BOEING')
             .when(F.col('manufacturer').rlike('BOMBARDIER'),          'BOMBARDIER')
             .when(F.col('manufacturer').rlike('CESSNA'),              'CESSNA')
             .when(F.col('manufacturer').rlike('EMBRAER'),             'EMBRAER')
             .when(F.col('manufacturer').rlike('SIKORSKY'),            'SIKORSKY')
             .when(F.col('manufacturer').rlike('CANADAIR'),            'CANADAIR')
             .when(F.col('manufacturer').rlike('PIPER'),               'PIPER')
             .when(F.col('manufacturer').rlike('MCDONNELL DOUGLAS'),   'MCDONNELL DOUGLAS')
             .when(F.col('manufacturer').rlike('CIRRUS'),              'CIRRUS')
             .when(F.col('manufacturer').rlike('BELL'),                'BELL')
             .when(F.col('manufacturer').rlike('KILDALL GARY'),        'KILDALL GARY')
             .when(F.col('manufacturer').rlike('LAMBERT RICHARD'),     'LAMBERT RICHARD')
             .when(F.col('manufacturer').rlike('BARKER JACK'),         'BARKER JACK')
             .when(F.col('manufacturer').rlike('ROBINSON HELICOPTER'), 'ROBINSON HELICOPTER')
             .when(F.col('manufacturer').rlike('GULFSTREAM'),          'GULFSTREAM')
             .when(F.col('manufacturer').rlike('MARZ BARRY'),          'MARZ BARRY'))

df_planes.show()

In [None]:
df_planes.groupBy("manufacturer").count().orderBy(F.col('manufacturer')).show(999)

# -----------------------------------------------------------------------------------------------------------
## 7.

In [None]:
df_planes.filter(F.col('model').rlike('[(]')).show()
df_planes.filter(F.col('model').rlike('[(]')).count()

In [None]:
# Retira todos os caracteres que se encontram entre ()
REGEX_PS = r'\s*\([^()]*\)\s*'

df_planes = df_planes.withColumn('model', F.regexp_replace(F.col('model'), REGEX_PS, ''))

df_planes.show()

In [None]:
df_planes.filter(F.col('tailnum') == 'N550AA').show()

In [None]:
df_planes.filter(F.col('model').rlike('[\s]')).show()

# -----------------------------------------------------------------------------------------------------------
## 8.

In [None]:
df_planes.filter(F.col('speed').isNull()).show(10)

In [None]:
df_planes.groupBy('speed').count().show()

In [None]:
df_planes = df_planes.withColumn('speed',
                                 F.when((F.col('speed').isNull()) &
                                        (F.col('seats').isNotNull()), F.ceil(F.expr('seats/0.36')))
                                  .otherwise(F.col('speed'))
                                )

df_planes.filter(F.col('tailnum') == 'N102UW').show()

In [None]:
df_planes = df_planes.withColumn("speed",
                                 F.when(~F.col('speed').between(50, 150), 0)
                                  .otherwise(F.col('speed'))
                                )

In [None]:
df_planes.groupBy("speed").count().orderBy(F.col('speed')).show(999)
df_planes.filter(F.col('speed').isNull()).show()

# -----------------------------------------------------------------------------------------------------------
## 9.

In [None]:
df_planes.filter(F.col('engine') == '4 Cycle').show()

In [None]:
df_planes = df_planes.withColumn('engine_type',
                                 F.when(F.col('engine') == 'Turbo-fan',   'FAN')
                                  .when(F.col('engine') == 'Turbo-jet',   'JET')
                                  .when(F.col('engine') == 'Turbo-prop',  'PROP')
                                  .when(F.col('engine') == 'Turbo-shaft', 'SHAFT')
                                  .when(F.col('engine') == '4 Cycle',     'CYCLE')
                                )

df_planes.show(10)
df_planes.filter(F.col('engine_type').isNull()).show()
df_planes.groupBy("engine_type").count().orderBy(F.col('engine_type')).show(999)

# -----------------------------------------------------------------------------------------------------------
# Transformação flights
# -----------------------------------------------------------------------------------------------------------
## 1.

In [None]:
df_flights.filter(F.col('hour').isNull()).show(10)
df_flights.filter(F.col('minute').isNull()).count()

In [None]:
df_flights = df_flights.withColumn('hour',
                                   F.when(F.col('hour').isNull(), 0)
                                    .otherwise(F.col('hour'))
                                  )
 
df_flights = df_flights.withColumn('minute',
                                   F.when(F.col('minute').isNull(), 0)
                                    .otherwise(F.col('minute'))
                                  )
 
df_flights.show(5)
df_flights.filter(F.col('minute').isNull()).show()

In [None]:
df_flights.groupBy("hour").count().orderBy(F.col('hour')).show()

# -----------------------------------------------------------------------------------------------------------
## 2.

In [None]:
df_flights.filter(F.col('hour') == 24).show()

In [None]:
df_flights = df_flights.withColumn('hour',
                                   F.when(F.col('hour') == 24, 0)
                                    .otherwise(F.col('hour'))
                                  )
 
df_flights.filter(F.col('hour') == 24).show()

In [None]:
df_flights.groupBy("hour").count().orderBy(F.col('hour')).show(999)

# -----------------------------------------------------------------------------------------------------------
## 3.

In [None]:
df_flights.filter(F.col('hour').isNull()).show()

In [None]:
df_flights = df_flights.withColumn("dep_datetime", F.expr("make_timestamp(year, month, day, hour, minute, 00)"))

df_flights.show(vertical = True, truncate = False)

# -----------------------------------------------------------------------------------------------------------
## 4.

In [None]:
df_flights.filter(F.col('dep_time') == 'NA').show(5)
df_flights.where(F.col('dep_time') == 'NA').count()

In [None]:
df_flights = df_flights.withColumn('dep_time',
                                   F.when(F.col('dep_time') == 'NA', F.concat(F.col('hour'), F.lpad(F.col('minute'), 2, '0')))
                                    .otherwise(F.col('dep_time'))
                                  ) 

df_flights.filter(F.col('dep_time') == 'NA').show()
df_flights.filter(F.col('dep_time') < 1).show(5)

In [None]:
df_flights.groupBy('dep_time').count().orderBy(F.col('dep_time')).show()

# -----------------------------------------------------------------------------------------------------------
## 5.

In [None]:
df_flights.filter(F.col('dep_delay').isNull()).show(10)
df_flights.filter(F.col('dep_delay').isNull()).count()

In [None]:
df_flights = df_flights.withColumn('dep_delay',
                                   F.when(F.col('dep_delay').isNull(), 0)
                                    .otherwise(F.col('dep_delay'))
                                  ) 

df_flights.filter(F.col('dep_delay').isNull()).show()
df_flights.filter(F.col('dep_delay').isNull()).count()
df_flights.filter(F.col('dep_delay') == 0).count()

In [None]:
df_flights.groupBy("dep_delay").count().orderBy(F.col('dep_delay')).show()

# -----------------------------------------------------------------------------------------------------------
## 6.

In [None]:
df_flights.filter(F.col('arr_delay').isNull()).show(10)
df_flights.filter(F.col('arr_delay').isNull()).count()

In [None]:
df_flights = df_flights.withColumn('arr_delay',
                                   F.when(F.col('arr_delay').isNull(), 0)
                                    .otherwise(F.col('arr_delay'))
                                  )
 
df_flights.filter(F.col('arr_delay').isNull()).show()
df_flights.filter(F.col('arr_delay') == 0).count()

In [None]:
df_flights.groupBy("arr_delay").count().orderBy(F.col('arr_delay')).show()

# -----------------------------------------------------------------------------------------------------------
## 7.

In [None]:
df_flights = df_flights.drop('year', 'month', 'day', 'hour', 'minute')

df_flights.show(10)

# -----------------------------------------------------------------------------------------------------------
## 8.

In [None]:
df_flights = df_flights.withColumn('air_time_projected', (F.col('distance') * 0.1 + 20).cast('int'))

In [None]:
df_flights.show(truncate = 10)
df_flights.groupBy("air_time_projected").count().orderBy(F.col('air_time_projected')).show()