In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan
from pyspark.sql.types import DoubleType, IntegerType

In [3]:
spark = SparkSession.builder \
  .appName("TransformacaoDados") \
  .getOrCreate()

## Leitura dos Dados

In [17]:
data_path = "../data/raw/alljoined_airlines.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- FL_DATE: string (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- DEP_DEL15: string (nullable = true)
 |-- DEP_DELAY_GROUP: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- ARR_DELAY_NEW: string (nullable = true)
 |-- ARR_DEL15: string (nullable = true)
 |-- ARR_DELAY_GROUP: string (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: integer (nullable = true)
 |-- CARRIER_DELAY: string (nullable = true)
 |-- WEATHER_DELAY: string (nullable = true)
 |-- NAS_DELAY: string (nullable = true)
 |-- SECURITY_DELAY: string (nullable

In [18]:
df.show(5)

+---+----+-----+-------------+-----------------+-----------------+-----------------+---------------+---------+-------------+---------+---------------+---------+-------------+---------+---------------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+
|_c0|YEAR|MONTH|      FL_DATE|OP_UNIQUE_CARRIER|OP_CARRIER_FL_NUM|ORIGIN_AIRPORT_ID|DEST_AIRPORT_ID|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CANCELLED|CANCELLATION_CODE|DIVERTED|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+---+----+-----+-------------+-----------------+-----------------+-----------------+---------------+---------+-------------+---------+---------------+---------+-------------+---------+---------------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+
|  1|2018|    1|1/26/18 00:00|               UA|             1252| 

### Seleção apenas das colunas relevantes

In [19]:
selected_columns = [
    "YEAR", "MONTH", "FL_DATE", "OP_UNIQUE_CARRIER", "ORIGIN_AIRPORT_ID", 
    "DEST_AIRPORT_ID", "DEP_DELAY", "ARR_DELAY", "CANCELLED", "DIVERTED", 
    "CARRIER_DELAY", "WEATHER_DELAY", "NAS_DELAY", "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY"
]

df = df.select(*selected_columns)

In [20]:
df.show(5)

+----+-----+-------------+-----------------+-----------------+---------------+---------+---------+---------+--------+-------------+-------------+---------+--------------+-------------------+
|YEAR|MONTH|      FL_DATE|OP_UNIQUE_CARRIER|ORIGIN_AIRPORT_ID|DEST_AIRPORT_ID|DEP_DELAY|ARR_DELAY|CANCELLED|DIVERTED|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+----+-----+-------------+-----------------+-----------------+---------------+---------+---------+---------+--------+-------------+-------------+---------+--------------+-------------------+
|2018|    1|1/26/18 00:00|               UA|            14683|          12266|       -7|      -12|        0|       0|           NA|           NA|       NA|            NA|                 NA|
|2018|    1|1/26/18 00:00|               UA|            10721|          13930|       -9|      -26|        0|       0|           NA|           NA|       NA|            NA|                 NA|
|2018|    1|1/26/18 00:00|               UA| 

### Renomeação das colunas para snake_case

In [23]:
df = df.withColumnRenamed("YEAR", "year") \
       .withColumnRenamed("MONTH", "month") \
       .withColumnRenamed("FL_DATE", "flight_date") \
       .withColumnRenamed("OP_UNIQUE_CARRIER", "carrier") \
       .withColumnRenamed("ORIGIN_AIRPORT_ID", "origin_airport_id") \
       .withColumnRenamed("DEST_AIRPORT_ID", "dest_airport_id") \
       .withColumnRenamed("DEP_DELAY", "departure_delay") \
       .withColumnRenamed("ARR_DELAY", "arrival_delay") \
       .withColumnRenamed("CANCELLED", "cancelled") \
       .withColumnRenamed("DIVERTED", "diverted") \
       .withColumnRenamed("CARRIER_DELAY", "carrier_delay") \
       .withColumnRenamed("WEATHER_DELAY", "weather_delay") \
       .withColumnRenamed("NAS_DELAY", "nas_delay") \
       .withColumnRenamed("SECURITY_DELAY", "security_delay") \
       .withColumnRenamed("LATE_AIRCRAFT_DELAY", "late_aircraft_delay")

In [24]:
df.show(5)

+----+-----+-------------+-------+-----------------+---------------+---------------+-------------+---------+--------+-------------+-------------+---------+--------------+-------------------+
|year|month|  flight_date|carrier|origin_airport_id|dest_airport_id|departure_delay|arrival_delay|cancelled|diverted|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|
+----+-----+-------------+-------+-----------------+---------------+---------------+-------------+---------+--------+-------------+-------------+---------+--------------+-------------------+
|2018|    1|1/26/18 00:00|     UA|            14683|          12266|             -7|          -12|        0|       0|           NA|           NA|       NA|            NA|                 NA|
|2018|    1|1/26/18 00:00|     UA|            10721|          13930|             -9|          -26|        0|       0|           NA|           NA|       NA|            NA|                 NA|
|2018|    1|1/26/18 00:00|     UA|           

## Conversão de Tipos

### Conversão para DoubleType

In [27]:
df.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('flight_date', 'string'),
 ('carrier', 'string'),
 ('origin_airport_id', 'int'),
 ('dest_airport_id', 'int'),
 ('departure_delay', 'string'),
 ('arrival_delay', 'string'),
 ('cancelled', 'int'),
 ('diverted', 'int'),
 ('carrier_delay', 'string'),
 ('weather_delay', 'string'),
 ('nas_delay', 'string'),
 ('security_delay', 'string'),
 ('late_aircraft_delay', 'string')]

In [28]:
numeric_columns = [
    "departure_delay", "arrival_delay", "carrier_delay", "weather_delay", 
    "nas_delay", "security_delay", "late_aircraft_delay"
]

for col_name in numeric_columns:
  df = df.withColumn(col_name, col(col_name).cast(DoubleType()))

In [30]:
df.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('flight_date', 'string'),
 ('carrier', 'string'),
 ('origin_airport_id', 'int'),
 ('dest_airport_id', 'int'),
 ('departure_delay', 'double'),
 ('arrival_delay', 'double'),
 ('cancelled', 'int'),
 ('diverted', 'int'),
 ('carrier_delay', 'double'),
 ('weather_delay', 'double'),
 ('nas_delay', 'double'),
 ('security_delay', 'double'),
 ('late_aircraft_delay', 'double')]

### Conversão das colunas ("cancelled" e "diverted") para IntegerType

In [35]:
df = df.withColumn("cancelled", col("cancelled").cast(IntegerType())) \
       .withColumn("diverted", col("diverted").cast(IntegerType()))


In [36]:
df.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('flight_date', 'string'),
 ('carrier', 'string'),
 ('origin_airport_id', 'int'),
 ('dest_airport_id', 'int'),
 ('departure_delay', 'double'),
 ('arrival_delay', 'double'),
 ('cancelled', 'int'),
 ('diverted', 'int'),
 ('carrier_delay', 'double'),
 ('weather_delay', 'double'),
 ('nas_delay', 'double'),
 ('security_delay', 'double'),
 ('late_aircraft_delay', 'double')]

In [37]:
spark.stop()