In [1]:
from get_data import get_data

get_data()

Downloading airline-delay-and-cancellation-data-2009-2018.zip to /opt/workspace/data/airline-delay-and-cancellation-data-2009-2018


100%|██████████| 1.95G/1.95G [03:56<00:00, 8.84MB/s]





In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, BooleanType
import pyspark.sql.functions as F
from pathlib import Path
from datetime import datetime

DATA_PATH = Path("/opt/workspace/data")
csv_files = list(DATA_PATH.glob("*.csv"))

In [3]:
jar_postgres = Path.cwd() / "postgresql-42.5.1.jar"
master_url = "spark://spark-master:7077"
spark = SparkSession \
        .builder \
        .appName("FlightDelaysNotebook") \
        .config("spark.jars", str(jar_postgres)) \
        .master(master_url) \
        .getOrCreate()

22/12/06 10:02:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Lendo os dados baixados:

In [4]:
df = spark.read.csv(list(map(lambda x: str(x), csv_files)), header=True)

                                                                                

In [5]:
df.printSchema()

root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_TIME: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- CANCELLED: string (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: string (nullable = true)
 |-- CRS_ELAPSED_TIME: string (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- CARRIER_DELAY: string (nullable = true)
 |-- WEATHER_DELAY: strin

Renomeando as colunas para lowercase e retirando a última (não é utilizada)

In [6]:
df = df.drop("Unnamed: 27")

for column in df.columns:
    df = df.withColumnRenamed(column, column.lower())

Iremos corrigir os dados no PySpark. Lembrando que a estrutura DataFrame é lazy, então o código abaixo só irá guardar as operações, que serão feitas na hora da escrita.

In [7]:
from pyspark.sql import Column, DataFrame

def cast_floatstr_to_int(col_str: str) -> Column:
    return F.col(col_str).cast(FloatType()).cast(IntegerType())

def datediff_min(col1: str, col2: str) -> Column:
    return (F.unix_timestamp(F.col(col1)) - F.unix_timestamp(F.col(col2))) / 60

def correct_data(df: DataFrame) -> DataFrame:
    df_new = df \
        .withColumn("crs_dep_time", F.concat_ws(' ', F.col("fl_date"), F.concat(F.lpad(F.col("crs_dep_time"), 4, '0'), F.lit("00")))) \
        .withColumn("dep_time", F.concat_ws(' ', F.col("fl_date"), F.concat(F.lpad(cast_floatstr_to_int("dep_time"), 4, '0'), F.lit("00")))) \
        .withColumn("crs_arr_time", F.concat_ws(' ', F.col("fl_date"), F.concat(F.lpad(F.col("crs_arr_time"), 4, '0'), F.lit("00")))) \
        .withColumn("arr_time", F.concat_ws(' ', F.col("fl_date"), F.concat(F.lpad(cast_floatstr_to_int("arr_time"), 4, '0'), F.lit("00")))) \
        .withColumn("wheels_off", F.concat_ws(' ', F.col("fl_date"), F.concat(F.lpad(cast_floatstr_to_int("wheels_off"), 4, '0'), F.lit("00")))) \
        .withColumn("wheels_on", F.concat_ws(' ', F.col("fl_date"), F.concat(F.lpad(cast_floatstr_to_int("wheels_on"), 4, '0'), F.lit("00")))) \
        .withColumn("fl_date", F.col("fl_date").cast("date")) \
        .withColumn("dep_delay", cast_floatstr_to_int("dep_delay")) \
        .withColumn("arr_delay", cast_floatstr_to_int("arr_delay")) \
        .withColumn("taxi_out", cast_floatstr_to_int("taxi_out")) \
        .withColumn("taxi_in", cast_floatstr_to_int("taxi_in"))
    
    timestamp_cols = ["wheels_on", "wheels_off", "arr_time", "crs_arr_time", "dep_time", "crs_dep_time"]
    
    for col in timestamp_cols:
        df_new = df_new.withColumn(col, F.to_timestamp(col, "yyyy-MM-dd HHmmss"))
    
    # check if dates are in the other day
    df_new = df_new \
        .withColumn("dep_time", F.when(datediff_min("crs_dep_time", "dep_time") > F.abs(F.col("dep_delay")), F.col("dep_time") + F.expr("INTERVAL 1 DAY")).otherwise(F.col("dep_time"))) \
        .withColumn("crs_arr_time", F.when(datediff_min("crs_arr_time", "crs_dep_time") < 0, F.col("crs_arr_time") + F.expr("INTERVAL 1 DAY")).otherwise(F.col("crs_arr_time"))) \
        .withColumn("wheels_on", F.when(datediff_min("wheels_on", "wheels_off") < 0, F.col("wheels_on") + F.expr("INTERVAL 1 DAY")).otherwise(F.col("wheels_on"))) \
        .withColumn("arr_time", F.when(F.datediff(F.to_date(F.col("crs_arr_time")), F.to_date(F.col("crs_dep_time"))) > 0, F.col("arr_time") + F.expr("INTERVAL 1 DAY")).otherwise(F.col("arr_time")))
    
    df_new = df_new \
            .withColumn("air_time", datediff_min("wheels_on", "wheels_off")) \
            .withColumn("crs_elapsed_time", datediff_min("crs_arr_time", "crs_dep_time")) \
            .withColumn("actual_elapsed_time", F.col("air_time") + F.col("taxi_out") + F.col("taxi_in")) \
            .withColumn("distance", F.col("distance").cast(FloatType())) \
            .withColumn("carrier_delay", F.col("carrier_delay").cast(FloatType())) \
            .withColumn("weather_delay", F.col("weather_delay").cast(FloatType())) \
            .withColumn("nas_delay", F.col("nas_delay").cast(FloatType())) \
            .withColumn("security_delay", F.col("security_delay").cast(FloatType())) \
            .withColumn("late_aircraft_delay", F.col("late_aircraft_delay").cast(FloatType())) \
            .withColumn("cancelled", cast_floatstr_to_int("cancelled").cast(BooleanType())) \
            .withColumn("diverted", cast_floatstr_to_int("diverted").cast(BooleanType()))
            
    return df_new


df = correct_data(df)

In [8]:
df.printSchema()

root
 |-- fl_date: date (nullable = true)
 |-- op_carrier: string (nullable = true)
 |-- op_carrier_fl_num: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- crs_dep_time: timestamp (nullable = true)
 |-- dep_time: timestamp (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- taxi_out: integer (nullable = true)
 |-- wheels_off: timestamp (nullable = true)
 |-- wheels_on: timestamp (nullable = true)
 |-- taxi_in: integer (nullable = true)
 |-- crs_arr_time: timestamp (nullable = true)
 |-- arr_time: timestamp (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- cancelled: boolean (nullable = true)
 |-- cancellation_code: string (nullable = true)
 |-- diverted: boolean (nullable = true)
 |-- crs_elapsed_time: double (nullable = true)
 |-- actual_elapsed_time: double (nullable = true)
 |-- air_time: double (nullable = true)
 |-- distance: float (nullable = true)
 |-- carrier_delay: float (nullable = true)
 |-- 

Escrevendo para um arquivo Parquet para comprimir e utiizar menos espaço no filesystem:

In [9]:
t0 = datetime.now()
df \
    .write \
    .parquet(str(DATA_PATH / "flight_delays") , mode="overwrite")
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")

22/12/06 10:02:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Execution time: 0:05:51.123187


                                                                                

Deletando os csvs e relendo o DataFrame

In [10]:
for file in csv_files:
    file.unlink()

In [11]:
df = spark.read.parquet(str(DATA_PATH / "flight_delays"))

Carregando os dados no Postgres

In [14]:
properties = {
    "user": "postgres",
    "password": "spark",
    "driver": "org.postgresql.Driver"
}
url = "jdbc:postgresql://demo-database:5432/postgres"

t0 = datetime.now()
df \
    .write \
    .jdbc(url=url, table="flight_delays", mode="append", properties=properties)
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")



Execution time: 0:18:52.826389


                                                                                

Conectando na database Postgres para comparar as operações do Spark com ela:

Conectando no Postgres

In [12]:
import psycopg2 as psql

properties_psql = {
    "database": "postgres",
    "user": "postgres",
    "password": "spark",
    "host": "demo-database",
    "port": "5432"
}

conn = psql.connect(**properties_psql)
cur = conn.cursor()

Tempo médio de atraso de saída e de taxi, por aeroporto de saída e por companhia aérea:

In [15]:
t0 = datetime.now()
result_spark = df \
    .select(F.col("op_carrier"), F.col("origin"), F.col("dep_delay"), F.col("taxi_out")) \
    .groupBy("op_carrier", "origin") \
    .agg(
        F.avg("dep_delay").alias("avg_dep_delay"),
        F.avg("taxi_out").alias("avg_taxi_out")
    ) \
    .orderBy(F.col("avg_dep_delay").desc(), F.col("avg_taxi_out").desc()) \
    .collect()
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")

                                                                                

Execution time: 0:00:05.096790


In [16]:
t0 = datetime.now()
cur.execute(
    """
    select
        origin,
        op_carrier,
        avg(dep_delay) as avg_dep_delay,
        avg(taxi_out) as avg_taxi_out
    from
        flight_delays
    group by
        origin,
        op_carrier
    order by
        3 desc,
        4 desc
    """
)
result_postgres = cur.fetchall()
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")
conn.commit()

Execution time: 0:00:11.316320


In [16]:
df \
    .select(F.col("op_carrier"), F.col("origin"), F.col("dep_delay"), F.col("taxi_out")) \
    .groupBy("op_carrier", "origin") \
    .agg(
        F.avg("dep_delay").alias("avg_dep_delay"),
        F.avg("taxi_out").alias("avg_taxi_out")
    ) \
    .orderBy(F.col("avg_dep_delay").desc(), F.col("avg_taxi_out").desc()) \
    .show()



+----------+------+------------------+------------------+
|op_carrier|origin|     avg_dep_delay|      avg_taxi_out|
+----------+------+------------------+------------------+
|        F9|   OAK|             328.0|              13.0|
|        OO|   ENV|             157.0|               8.0|
|        EV|   BZN|             142.0|              17.0|
|        VX|   TUL|             113.0|              11.0|
|        EV|   BOI|             113.0|               8.0|
|        EV|   RKS|             105.0|               5.0|
|        G4|   CPR|             100.0|               7.0|
|        B6|   AVP|              91.0|              25.0|
|        OO|   GCK|              74.0|              14.0|
|        9E|   GTF| 71.73684210526316|20.789473684210527|
|        EV|   MCN|              67.0|               1.0|
|        G4|   YNG|              63.0|              38.5|
|        OO|   SWO|              61.0|               8.0|
|        XE|   SPI|              61.0|               5.0|
|        G4|  

                                                                                

Média de tempo de atraso total, atraso de partida e de chegada e de tempos de taxi, por companhia aérea, aeroporto de origem e de destino

In [21]:
t0 = datetime.now()
result_spark = df \
    .select(
        F.col("op_carrier"),
        F.col("origin"),
        F.col("dest"),
        F.col("crs_elapsed_time"),
        F.col("actual_elapsed_time"),
        F.col("dep_delay"),
        F.col("taxi_out"),
        F.col("taxi_in"),
        F.col("arr_delay")
    ) \
    .groupBy("op_carrier", "origin", "dest") \
    .agg(
        F.avg(F.col("crs_elapsed_time") - F.col("actual_elapsed_time")).alias("avg_total_delay"),
        F.avg("dep_delay").alias("avg_dep_delay"),
        F.avg("taxi_out").alias("avg_taxi_out"),
        F.avg("taxi_in").alias("avg_taxi_in"),
        F.avg("arr_delay").alias("avg_arr_delay")
    ) \
    .orderBy(F.col("avg_total_delay").desc()) \
    .collect()
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")

                                                                                

Execution time: 0:00:10.256303


In [11]:
t0 = datetime.now()
cur.execute(
    """
    with total_delay as (
        select
            origin,
            dest,
            op_carrier,
            extract(epoch from (crs_elapsed_time - actual_elapsed_time))/60 as total_delay,
            dep_delay,
            arr_delay,
            taxi_out,
            taxi_in
        from
            flight_delays
    )
    select
        origin,
        dest,
        op_carrier,
        avg(total_delay) as avg_total_delay,
        avg(dep_delay) as avg_dep_delay,
        avg(arr_delay) as avg_arr_delay,
        avg(taxi_out) as avg_taxi_out,
        avg(taxi_in) as avg_taxi_in
    from
        total_delay
    group by
        origin,
        dest,
        op_carrier
    order by
        4 desc
    """
)
result_postgres = cur.fetchall()
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")
conn.commit()

Execution time: 0:00:22.466811


In [18]:
df \
    .select(
        F.col("op_carrier"),
        F.col("origin"),
        F.col("dest"),
        F.col("crs_elapsed_time"),
        F.col("actual_elapsed_time"),
        F.col("dep_delay"),
        F.col("taxi_out"),
        F.col("taxi_in"),
        F.col("arr_delay")
    ) \
    .where(F.col("cancelled") == False) \
    .groupBy("op_carrier", "origin", "dest") \
    .agg(
        F.avg(F.col("actual_elapsed_time") - F.col("crs_elapsed_time")).alias("avg_total_delay"),
        F.avg("dep_delay").alias("avg_dep_delay"),
        F.avg("taxi_out").alias("avg_taxi_out"),
        F.avg("taxi_in").alias("avg_taxi_in"),
        F.avg("arr_delay").alias("avg_arr_delay")
    ) \
    .orderBy(F.col("avg_total_delay").desc()) \
    .show()

                                                                                

+----------+------+----+------------------+-------------------+------------------+------------------+------------------+
|op_carrier|origin|dest|   avg_total_delay|      avg_dep_delay|      avg_taxi_out|       avg_taxi_in|     avg_arr_delay|
+----------+------+----+------------------+-------------------+------------------+------------------+------------------+
|        AA|   CLE| ORD|            1482.0|               -5.0|              16.0|              37.0|              -7.0|
|        UA|   LAX| MIA|            1480.0|               -3.0|              15.0|              37.0|              -8.0|
|        OO|   PDX| OKC|            1477.0|              -32.0|               9.0|               5.0|             -27.0|
|        OH|   ATL| DHN|1460.9411764705883| 29.205882352941178|27.529411764705884| 5.117647058823529| 28.41176470588235|
|        UA|   LAX| CMH|            1459.0|                0.0|              17.0|              19.0|             -13.0|
|        EV|   MEM| JFK|        

In [19]:
df \
    .select(
        F.col("op_carrier"),
        F.col("origin"),
        F.col("dest"),
        F.col("crs_dep_time"),
        F.col("crs_arr_time"),
        F.col("wheels_off"),
        F.col("wheels_on"),
        F.col("air_time"),
        F.col("crs_elapsed_time"),
        F.col("actual_elapsed_time"),
        F.col("dep_delay"),
        F.col("taxi_out"),
        F.col("taxi_in"),
        F.col("arr_delay"),
        F.col("cancelled")
    ) \
    .withColumn("total_delay", F.col("crs_elapsed_time") - F.col("actual_elapsed_time")) \
    .where((F.col("op_carrier") == "AA") & (F.col("origin") == "CLE") & (F.col("dest") == "ORD")) \
    .show()



+----------+------+----+-------------------+-------------------+-------------------+-------------------+--------+----------------+-------------------+---------+--------+-------+---------+---------+-----------+
|op_carrier|origin|dest|       crs_dep_time|       crs_arr_time|         wheels_off|          wheels_on|air_time|crs_elapsed_time|actual_elapsed_time|dep_delay|taxi_out|taxi_in|arr_delay|cancelled|total_delay|
+----------+------+----+-------------------+-------------------+-------------------+-------------------+--------+----------------+-------------------+---------+--------+-------+---------+---------+-----------+
|        AA|   CLE| ORD|2018-11-24 07:48:00|2018-11-24 08:35:00|2018-11-24 07:59:00|2018-11-25 08:35:00|  1476.0|            47.0|             1529.0|       -5|      16|     37|       -7|    false|    -1482.0|
+----------+------+----+-------------------+-------------------+-------------------+-------------------+--------+----------------+-------------------+---------+

                                                                                

Essa daqui dá crash

In [11]:
t0 = datetime.now()
result_spark = df \
    .select(
        F.col("fl_date"),
        F.col("op_carrier"),
        F.col("origin"),
        F.col("dest"),
        F.col("crs_elapsed_time"),
        F.col("actual_elapsed_time"),
        F.col("dep_delay"),
        F.col("taxi_out"),
        F.col("taxi_in"),
        F.col("arr_delay")
    ) \
    .groupBy("fl_date", "op_carrier", "origin", "dest") \
    .agg(
        F.avg(F.col("crs_elapsed_time") - F.col("actual_elapsed_time")).alias("avg_total_delay"),
        F.avg("dep_delay").alias("avg_dep_delay"),
        F.avg("taxi_out").alias("avg_taxi_out"),
        F.avg("taxi_in").alias("avg_taxi_in"),
        F.avg("arr_delay").alias("avg_arr_delay")
    ) \
    .orderBy(F.col("avg_total_delay").desc()) \
    .collect()
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")

Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.status.AppStatusStore.activeStages(AppStatusStore.scala:114)
	at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:64)
	at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:52)
	at java.base/java.util.TimerThread.mainLoop(Unknown Source)
	at java.base/java.util.TimerThread.run(Unknown Source)


Py4JJavaError: An error occurred while calling o157.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.SparkPlan$$anon$1._next(SparkPlan.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:388)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:374)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1(SparkPlan.scala:411)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1$adapted(SparkPlan.scala:410)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2992/0x000000084120c840.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:410)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:340)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2526/0x0000000841032840.apply(Unknown Source)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:368)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:340)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538)
	at org.apache.spark.sql.Dataset$$Lambda$2200/0x0000000840eb5040.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.Dataset$$Lambda$2201/0x0000000840eb5840.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2206/0x0000000840eb8040.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2202/0x0000000840eb5c40.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


Essa query no postgres crasha o computador

In [None]:
t0 = datetime.now()
cur.execute(
    """
    select
        fl_date,
        origin,
        dest,
        op_carrier,
        avg(crs_elapsed_time - actual_elapsed_time) as avg_total_delay,
        avg(dep_delay) as avg_dep_delay,
        avg(arr_delay) as avg_arr_delay,
        avg(taxi_out) as avg_taxi_out,
        avg(taxi_in) as avg_taxi_in
    from
        flight_delays
    group by
        fl_date,
        origin,
        dest,
        op_carrier
    order by
        5 desc
    """
)
result_postgres = cur.fetchall()
t1 = datetime.now()
delta = t1 - t0

print(f"Execution time: {delta}")
conn.commit()