In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    LongType,
    BooleanType,
    DoubleType,
)
from pyspark.sql.functions import when, lit, isnan, length, col

import re


In [2]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = (
    SparkSession.builder.appName("Semana3")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)


# Airport Dataset

In [3]:
schema = StructType(
    [
        StructField("faa", StringType(), True),
        StructField("name", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lon", DoubleType(), True),
        StructField("alt", IntegerType(), True),
        StructField("tz", DoubleType(), True),
        StructField("dst", StringType(), True),
    ]
)


In [4]:
df = spark.read.csv("../Datasets/airports.csv", header=True, schema=schema)
# ,inferSchema=True
df.show(5, False)


+---+-----------------------------+----------+-----------+----+----+---+
|faa|name                         |lat       |lon        |alt |tz  |dst|
+---+-----------------------------+----------+-----------+----+----+---+
|04G|Lansdowne Airport            |41.1304722|-80.6195833|1044|-5.0|A  |
|06A|Moton Field Municipal Airport|32.4605722|-85.6800278|264 |-5.0|A  |
|06C|Schaumburg Regional          |41.9893408|-88.1012428|801 |-6.0|A  |
|06N|Randall Airport              |41.431912 |-74.3915611|523 |-5.0|A  |
|09J|Jekyll Island Airport        |31.0744722|-81.4277778|11  |-4.0|A  |
+---+-----------------------------+----------+-----------+----+----+---+
only showing top 5 rows



Pergunta 1

In [5]:
df.filter(F.col("alt") < 0).show()
df = df.withColumn("alt", when(df.alt < 0, 0).otherwise(df.alt))
df.filter(F.col("alt") < 0).show()


+---+-------------+---------+-----------+---+----+---+
|faa|         name|      lat|        lon|alt|  tz|dst|
+---+-------------+---------+-----------+---+----+---+
|IPL|  Imperial Co|32.834219|-115.578744|-54|-8.0|  A|
|NJK|El Centro Naf|32.829222|-115.671667|-42|-8.0|  A|
+---+-------------+---------+-----------+---+----+---+

+---+----+---+---+---+---+---+
|faa|name|lat|lon|alt| tz|dst|
+---+----+---+---+---+---+---+
+---+----+---+---+---+---+---+



Pergunta 2

In [6]:
df = df.withColumn("dst", F.when(df.tz.between(-7, -5), "A").otherwise(df.dst))
df.select("dst").groupBy("dst").count().show()


+---+-----+
|dst|count|
+---+-----+
|  U|    8|
|  A| 1380|
|  N|    9|
+---+-----+



Pergunta 3

In [7]:
df = df.withColumn("dst", (F.when(df.dst == "U", "A").otherwise(df.dst)))
df.select("dst").groupBy("dst").count().show()


+---+-----+
|dst|count|
+---+-----+
|  A| 1388|
|  N|    9|
+---+-----+



Pergunta 4

In [8]:
df = df.withColumn(
    "region",
    F.when(df.lon < -124, "ALASKA")
    .when((df.lon > -50) | (df.lat < 24), "OFFSHORE")
    .when(
        (df.lon <= -95) & (df.lon.between(-124, -50)),
        "MAINLAND-WEST",
    )
    .when(
        (df.lon > -95) & (df.lon.between(-124, -50)),
        "MAINLAND-EAST",
    )
    .otherwise(float("NaN")),
)
df.select("region").groupBy("region").count().show()


+-------------+-----+
|       region|count|
+-------------+-----+
|       ALASKA|  261|
|     OFFSHORE|    4|
|MAINLAND-EAST|  696|
|MAINLAND-WEST|  436|
+-------------+-----+




Pergunta 5

In [9]:
AP = ["Airport", "Tradeport", "Heliport", "Airpor", "Arpt"]
AD = ["Aerodrome"]
AK = ["Airpark", "Aero Park"]
AS = ["Station", "Air Station"]
FL = ["Field", "Fld"]

df = df.withColumn(
    "type",
    (
        F.when(
            (df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", AP)))), "AP"
        )
        .when((df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", AD)))), "AD")
        .when((df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", AK)))), "AK")
        .when((df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", AS)))), "AS")
        .when((df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", FL)))), "FL")
        .otherwise("NaN")
    ),
)
df.select("type").groupBy("type").count().show()


+----+-----+
|type|count|
+----+-----+
|  AD|    1|
| NaN|  663|
|  AS|   19|
|  FL|   78|
|  AK|   12|
|  AP|  624|
+----+-----+



Pergunta 6

In [10]:
MILITARY = [
    "Base",
    "Aaf",
    "Afs",
    "Ahp",
    "Afb",
    "LRRS",
    "Lrrs",
    "Arb",
    "Naf",
    "NAS",
    "Nas",
    "Jrb",
    "Ns",
    "As",
    "Cgas",
    "Angb",
]

SUBSTRING = r"|".join(map(lambda x: f"^{x} | {x} | {x}$", MILITARY))

df = df.withColumn(
    "military", F.when(df.name.rlike(SUBSTRING), True).otherwise(False)
)
df.select("military").groupBy("military").count().show()


+--------+-----+
|military|count|
+--------+-----+
|    true|  161|
|   false| 1236|
+--------+-----+



Pergunta 7

In [11]:
I = ["International", "Intl", "Intercontinental"]
N = ["National", "Natl"]
R = ["Regional", "Reigonal", "Rgnl", "County", "Metro", "Metropolitan"]
M = ["Municipal", "Muni", "City"]


df = df.withColumn(
    "administration",
    (
        F.when(
            (df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", I)))),
            "I",
        )
        .when(
            (df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", N)))),
            "N",
        )
        .when(
            (df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", R)))),
            "R",
        )
        .when(
            (df.name.rlike(r"|".join(map(lambda x: f".*({x}).*", M)))),
            "M",
        )
        .otherwise("NaN")
    ),
)
df.select("administration").groupBy("administration").count().show()


+--------------+-----+
|administration|count|
+--------------+-----+
|             M|  180|
|             N|    5|
|             R|  287|
|           NaN|  761|
|             I|  164|
+--------------+-----+



Salvando

In [12]:
"""
df.repartition(1).write.format("parquet").mode("overwrite").option(
    "header", "true"
).save("../Data/airports_proc.parquet")
"""
df = df.toPandas()
df.to_parquet("../Data/airports_proc.parquet")


# Planes Dataset

In [13]:
schema = StructType(
    [
        StructField("tailnum", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("type", StringType(), True),
        StructField("manufacturer", StringType(), True),
        StructField("model", StringType(), True),
        StructField("engines", IntegerType(), True),
        StructField("seats", IntegerType(), True),
        StructField("speed", IntegerType(), True),
        StructField("engine", StringType(), True),
    ]
)


In [14]:
df = spark.read.csv("../Datasets/planes.csv", header=True, schema=schema)
# ,inferSchema=True
df.show(5, False)


+-------+----+-----------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|type                   |manufacturer    |model   |engines|seats|speed|engine   |
+-------+----+-----------------------+----------------+--------+-------+-----+-----+---------+
|N102UW |1998|Fixed wing multi engine|AIRBUS INDUSTRIE|A320-214|2      |182  |null |Turbo-fan|
|N103US |1999|Fixed wing multi engine|AIRBUS INDUSTRIE|A320-214|2      |182  |null |Turbo-fan|
|N104UW |1999|Fixed wing multi engine|AIRBUS INDUSTRIE|A320-214|2      |182  |null |Turbo-fan|
|N105UW |1999|Fixed wing multi engine|AIRBUS INDUSTRIE|A320-214|2      |182  |null |Turbo-fan|
|N107US |1999|Fixed wing multi engine|AIRBUS INDUSTRIE|A320-214|2      |182  |null |Turbo-fan|
+-------+----+-----------------------+----------------+--------+-------+-----+-----+---------+
only showing top 5 rows



Pergunta 1

In [15]:
df = df.withColumn("tailchar", F.regexp_replace(df.tailnum, "[0-9]|^N", ""))
df.select("tailchar", "tailnum").groupBy("tailchar", "tailnum").count().show(3)


+--------+-------+-----+
|tailchar|tailnum|count|
+--------+-------+-----+
|      US| N170US|    1|
|      UW| N191UW|    1|
|      NW| N337NW|    1|
+--------+-------+-----+
only showing top 3 rows




Pergunta 2

In [16]:
df = df.withColumn("year", when(df.year == 0, 1996).otherwise(df.year))
df.select("year").filter(df.year == 1996).groupBy("year").count().show(3)


+----+-----+
|year|count|
+----+-----+
|1996|   73|
+----+-----+



Pergunta 3
Não entendi estudar resposta.

In [17]:
# Criando tabela ordenada por Manufacturer e Model
df_ordered = (
    df.groupBy("manufacturer", "model")
    .min("year")
    .orderBy("manufacturer", "model")
)

# Renomeando colunas
df_ordered = (
    df_ordered.withColumnRenamed("manufacturer", "manufacturer1")
    .withColumnRenamed("model", "model1")
    .withColumnRenamed("min(year)", "year1")
)

# Criando tabela ordenada somente por manufacturer
df_ordered2 = df.groupBy("manufacturer").min("year").orderBy("manufacturer")

# Renomeando colunas
df_ordered2 = df_ordered2.withColumnRenamed(
    "manufacturer", "manufacturer2"
).withColumnRenamed("min(year)", "year2")

# Fazendo left join da tabela df com a tabela df_ordered
df_final = df.join(
    df_ordered,
    (df.manufacturer == df_ordered.manufacturer1)
    & (df.model == df_ordered.model1),
    "left",
)

# Fazendo left join da tabela df_final com a df_ordered2
df_final = df_final.join(
    df_ordered2, df_final.manufacturer == df_ordered2.manufacturer2, "left"
)

# Modificando a coluna year para inputar os anos
df_final = df_final.withColumn(
    "year",
    F.when((F.col("year").isNull()) & (F.col("year1").isNull()), F.col("year2"))
    .when(F.col("year").isNull(), F.col("year1"))
    .otherwise(F.col("year")),
)

# Dropando as colunas criadas pelo join
df_final = df_final.drop(
    "manufacturer1", "manufacturer2", "model1", "year1", "year2"
)


df_final.show(3)

df = df_final


+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|tailchar|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      UW|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      US|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|      UW|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+--------+
only showing top 3 rows



Pergunta 4

In [18]:
df = df.withColumn("age", F.expr("2022 - year"))
df.select("age", "year").groupBy("age", "year").count().show(3)


+---+----+-----+
|age|year|count|
+---+----+-----+
| 17|2005|  112|
| 13|2009|   77|
| 11|2011|   64|
+---+----+-----+
only showing top 3 rows




Pergunta 5

In [19]:
df = df.withColumn(
    "type",
    when(df.type == "Fixed wing multi engine", "MULTI_ENG")
    .when(df.type == "Fixed wing single engine", "SINGLE_ENG")
    .when(df.type == "Rotorcraft", "ROTORCRAFT")
    .otherwise(df.type),
)
df.select("type").groupBy("type").count().show(3)


+----------+-----+
|      type|count|
+----------+-----+
|SINGLE_ENG|   10|
| MULTI_ENG| 2615|
|ROTORCRAFT|    3|
+----------+-----+



Pergunta 6

In [20]:
print(
    [
        data[0]
        for data in df.select("manufacturer")
        .distinct()
        .orderBy("manufacturer", ascending=True)
        .toLocalIterator()
    ]
)


['AIRBUS', 'AIRBUS INDUSTRIE', 'BARKER JACK L', 'BELL', 'BOEING', 'BOMBARDIER INC', 'CANADAIR', 'CESSNA', 'CIRRUS DESIGN CORP', 'EMBRAER', 'GULFSTREAM AEROSPACE', 'KILDALL GARY', 'LAMBERT RICHARD', 'MARZ BARRY', 'MCDONNELL DOUGLAS', 'MCDONNELL DOUGLAS AIRCRAFT CO', 'PIPER', 'ROBINSON HELICOPTER CO', 'SIKORSKY']


In [21]:
df = df.withColumn(
    "manufacturer",
    F.when(df.manufacturer.startswith("AIRBUS"), "AIRBUS")
    .when(df.manufacturer.startswith("BARKER"), "BARKER JACK")
    .when(df.manufacturer.startswith("BELL"), "BELL")
    .when(df.manufacturer.startswith("BOEING"), "BOEING")
    .when(df.manufacturer.startswith("BOMBARDIER"), "BOMBARDIER")
    .when(df.manufacturer.startswith("CANADAIR"), "CANADAIR")
    .when(df.manufacturer.startswith("CESSNA"), "CESSNA")
    .when(df.manufacturer.startswith("CIRRUS"), "CIRRUS")
    .when(df.manufacturer.startswith("EMBRAER"), "EMBRAER")
    .when(df.manufacturer.startswith("GULFSTREAM"), "GULFSTREAM")
    .when(df.manufacturer.startswith("KILDALL"), "KILDALL GARY")
    .when(df.manufacturer.startswith("LAMBERT"), "LAMBERT RICHARD")
    .when(df.manufacturer.startswith("MARZ"), "MARZ BARRY")
    .when(df.manufacturer.startswith("MCDONNELL"), "MCDONNELL DOUGLAS")
    .when(df.manufacturer.startswith("PIPER"), "PIPER")
    .when(df.manufacturer.startswith("ROBINSON"), "ROBINSON HELICOPTER")
    .when(df.manufacturer.startswith("SIKORSKY"), "SIKORSKY"),
)



Pergunta 7

In [22]:
print(
    [
        data[0]
        for data in df.select("model")
        .distinct()
        .orderBy("model", ascending=True)
        .toLocalIterator()
    ]
)


['150', '172M', '206B', '210-5(205)', '421C', '737-301', '737-3A4', '737-3G7', '737-3H4', '737-3K2', '737-3L9', '737-3Q8', '737-3T5', '737-3TO', '737-3Y0', '737-490', '737-4Q8', '737-4S3', '737-5H4', '737-705', '737-724', '737-732', '737-73V', '737-76N', '737-76Q', '737-790', '737-7AD', '737-7BD', '737-7BX', '737-7H4', '737-7K9', '737-7Q8', '737-824', '737-832', '737-890', '737-8FH', '737-8H4', '737-924', '737-924ER', '737-932ER', '737-990', '737-990ER', '747-451', '757-212', '757-222', '757-223', '757-224', '757-231', '757-232', '757-251', '757-26D', '757-28A', '757-2G7', '757-2Q8', '757-2S7', '757-324', '757-33N', '757-351', '767-322', '767-332', '767-33A', '767-3CB', '767-3P6', '767-432ER', '777-224', '777-232LR', 'A319-111', 'A319-112', 'A319-114', 'A319-115', 'A319-131', 'A319-132', 'A320-211', 'A320-212', 'A320-214', 'A320-231', 'A320-232', 'A321-211', 'A321-231', 'A330-223', 'A330-243', 'A330-323', 'CL-600-2B19', 'CL-600-2C10', 'CL-600-2D24', 'DC-9-82(MD-82)', 'DC-9-83(MD-83)', 

In [23]:
df = df.withColumn("model", F.regexp_replace(df.model, r"\s*\([^()]*\)\s*", ""))

print(
    [
        data[0]
        for data in df.select("model")
        .distinct()
        .orderBy("model", ascending=True)
        .toLocalIterator()
    ]
)


['150', '172M', '206B', '210-5', '421C', '737-301', '737-3A4', '737-3G7', '737-3H4', '737-3K2', '737-3L9', '737-3Q8', '737-3T5', '737-3TO', '737-3Y0', '737-490', '737-4Q8', '737-4S3', '737-5H4', '737-705', '737-724', '737-732', '737-73V', '737-76N', '737-76Q', '737-790', '737-7AD', '737-7BD', '737-7BX', '737-7H4', '737-7K9', '737-7Q8', '737-824', '737-832', '737-890', '737-8FH', '737-8H4', '737-924', '737-924ER', '737-932ER', '737-990', '737-990ER', '747-451', '757-212', '757-222', '757-223', '757-224', '757-231', '757-232', '757-251', '757-26D', '757-28A', '757-2G7', '757-2Q8', '757-2S7', '757-324', '757-33N', '757-351', '767-322', '767-332', '767-33A', '767-3CB', '767-3P6', '767-432ER', '777-224', '777-232LR', 'A319-111', 'A319-112', 'A319-114', 'A319-115', 'A319-131', 'A319-132', 'A320-211', 'A320-212', 'A320-214', 'A320-231', 'A320-232', 'A321-211', 'A321-231', 'A330-223', 'A330-243', 'A330-323', 'CL-600-2B19', 'CL-600-2C10', 'CL-600-2D24', 'DC-9-82', 'DC-9-83', 'EMB-120', 'EMB-120


Pergunta 8

In [24]:
df.select("speed").filter(df.speed.isNull()).groupBy("speed").count().show(3)

df = df.withColumn(
    "speed",
    F.when(
        (df.speed.isNull()) & (df.seats.isNotNull()),
        F.ceil(F.expr("seats/0.36")),
    ).otherwise(df.speed),
)

df = df.withColumn("speed", df.speed.cast(IntegerType()))

df.select("speed").filter(df.speed.isNull()).groupBy("speed").count().show(3)


+-----+-----+
|speed|count|
+-----+-----+
| null| 2622|
+-----+-----+

+-----+-----+
|speed|count|
+-----+-----+
+-----+-----+




Pergunta 9

In [25]:
df = df.withColumn(
    "engine_type",
    F.when(df.engine == "Turbo-fan", "FAN")
    .when(df.engine == "Turbo-jet", "JET")
    .when(df.engine == "Turbo-prop", "PROP")
    .when(df.engine == "Turbo-shaft", "SHAFT")
    .when(df.engine == "4 Cycle", "CYCLE")
    .otherwise((None)),
)

df.select("engine_type").groupBy("engine_type").count().show(6)


+-----------+-----+
|engine_type|count|
+-----------+-----+
|       PROP|   37|
|       null|   10|
|      CYCLE|    1|
|        FAN| 2127|
|        JET|  450|
|      SHAFT|    3|
+-----------+-----+



Salvando

In [26]:
"""
df.repartition(1).write.format("parquet").mode("overwrite").option(
    "header", "true"
).save("../Data/planes_proc.parquet")
"""
df = df.toPandas()
df.to_parquet("../Data/lanes_proc.parquet")

# Flights Dataset

In [27]:
schema = (
    StructType()
    .add("year", IntegerType(), True)
    .add("month", IntegerType(), True)
    .add("day", IntegerType(), True)
    .add("dep_time", StringType(), True)
    .add("dep_delay", IntegerType(), True)
    .add("arr_time", StringType(), True)
    .add("arr_delay", IntegerType(), True)
    .add("carrier", StringType(), True)
    .add("tailnum", StringType(), True)
    .add("flight", StringType(), True)
    .add("origin", StringType(), True)
    .add("dest", StringType(), True)
    .add("air_time", IntegerType(), True)
    .add("distance", IntegerType(), True)
    .add("hour", IntegerType(), True)
    .add("minute", IntegerType(), True)
)


In [28]:
df = spark.read.csv("../Datasets/flights.csv", header=True, schema=schema)
# ,inferSchema=True
df.show(5, False)


+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |
|2014|4    |9  |1705    |45       |1839    |34       |WN     |N360SW |344   |PDX   |SJC |83      |569     |17  |5     |
|2014|3    |9  |754     |-1       |1015    |1        |AS     |N612AS |522   |SEA   |BUR |127     |937     |7   |54    |
+----+-----+---+--------+---------+-----

Pergunta 1

In [29]:
df = df.withColumn(
    "hour", F.when(df.hour.isNull(), 0).otherwise(df.hour)
)


df = df.withColumn(
    "minute", F.when(df.minute.isNull(), 0).otherwise(df.minute)
)
df.select("hour","minute").groupBy("hour","minute").count().show(6)

+----+------+-----+
|hour|minute|count|
+----+------+-----+
|  15|    26|    8|
|  22|    53|    5|
|   7|    55|   12|
|   8|    52|    6|
|  15|    14|    5|
|  17|    33|    7|
+----+------+-----+
only showing top 6 rows



Pergunta 2

In [30]:
df = df.withColumn(
    "hour", F.when(df.hour == 24, 0).otherwise(df.hour)
)


Pergunta 3

In [31]:
df = df.withColumn(
    "dep_datetime", F.expr("make_timestamp(year, month, day, hour, minute, 00)")
)


Pergunta 4

In [32]:
print(
    [
        data[0]
        for data in df.select("dep_time","hour")
        .distinct()
        .orderBy("hour", ascending=True)
        .toLocalIterator()
    ][:15]
)

df.select("dep_time","hour","minute").show(10)

['19', '8', '30', '13', '42', '7', '29', '12', '48', '6', '41', '3', '14', '45', '27']
+--------+----+------+
|dep_time|hour|minute|
+--------+----+------+
|     658|   6|    58|
|    1040|  10|    40|
|    1443|  14|    43|
|    1705|  17|     5|
|     754|   7|    54|
|    1037|  10|    37|
|     847|   8|    47|
|    1655|  16|    55|
|    1236|  12|    36|
|    1812|  18|    12|
+--------+----+------+
only showing top 10 rows



In [33]:

df = df.withColumn(
    "dep_time",
    F.when(
        df.dep_time == "NA",
        F.concat(df.hour, F.lpad(df.minute, 2, "0")),
    ).otherwise(df.dep_time),
)
df.select("dep_time","hour","minute").show(10)

+--------+----+------+
|dep_time|hour|minute|
+--------+----+------+
|     658|   6|    58|
|    1040|  10|    40|
|    1443|  14|    43|
|    1705|  17|     5|
|     754|   7|    54|
|    1037|  10|    37|
|     847|   8|    47|
|    1655|  16|    55|
|    1236|  12|    36|
|    1812|  18|    12|
+--------+----+------+
only showing top 10 rows



Pergunta 5

In [34]:
df = df.withColumn(
    "dep_delay",
    F.when(df.dep_delay.isNull(), 0).otherwise(df.dep_delay),
)


Pergunta 6

In [35]:
df = df.withColumn(
    "arr_delay",
    F.when(df.arr_delay.isNull(), 0).otherwise(df.arr_delay),
)


Pergunta 7

In [36]:
df = df.drop("year", "month", "day", "hour", "minute")


Pergunta 8

In [37]:
df = df.withColumn(
    "air_time_projected", ((F.col("distance") * 0.1) + 20).cast(IntegerType())
)



df.groupBy("air_time_projected").count().distinct().show(10)

+------------------+-----+
|air_time_projected|count|
+------------------+-----+
|                31|   54|
|               255|   27|
|               193|  215|
|               115|  521|
|               101|   71|
|               126|   26|
|                76|  164|
|               192|  282|
|                44|   10|
|               159|  166|
+------------------+-----+
only showing top 10 rows



Pergunta 9

In [38]:
df_1 = (
    df.groupBy("origin", "dest")
    .agg(F.avg("air_time").cast(IntegerType()).alias("air_time_expected"))
    .orderBy("origin")
)
df_1.show(5)

+------+----+-----------------+
|origin|dest|air_time_expected|
+------+----+-----------------+
|   PDX| IAH|              213|
|   PDX| ONT|              111|
|   PDX| PHX|              130|
|   PDX| SFO|               85|
|   PDX| MCI|              174|
+------+----+-----------------+
only showing top 5 rows



In [39]:
print("Actual columns:", df_1.columns)


df_1 = (
    df_1.withColumnRenamed("origin", "O1")
    .withColumnRenamed("dest", "D1")
)
print("modified columns:", df_1.columns)

df_1.show(5)


Actual columns: ['origin', 'dest', 'air_time_expected']
modified columns: ['O1', 'D1', 'air_time_expected']
+---+---+-----------------+
| O1| D1|air_time_expected|
+---+---+-----------------+
|PDX|IAH|              213|
|PDX|ONT|              111|
|PDX|PHX|              130|
|PDX|SFO|               85|
|PDX|MCI|              174|
+---+---+-----------------+
only showing top 5 rows



In [40]:
df = df.join(df_1, (df.origin == df_1.O1) & (df.dest == df_1.D1), "left")


In [41]:
df.show(10)

+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---+---+-----------------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|       dep_datetime|air_time_projected| O1| D1|air_time_expected|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---+---+-----------------+
|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|2014-12-08 06:58:00|               115|SEA|LAX|              126|
|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|2014-01-22 10:40:00|               287|SEA|HNL|              343|
|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|2014-03-09 14:43:00|                87|SEA|SFO|              101|
|    1705|       45|    1839|       34|     WN| N360

Pergunta 10

In [42]:
df = df.withColumn(
    "air_time",
    F.when(
        df.air_time.isNull(),
        F.greatest(df.air_time_projected, df.air_time_expected),
    ).otherwise(df.air_time),
)
df.show(10)

+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---+---+-----------------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|       dep_datetime|air_time_projected| O1| D1|air_time_expected|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---+---+-----------------+
|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|2014-12-08 06:58:00|               115|SEA|LAX|              126|
|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|2014-01-22 10:40:00|               287|SEA|HNL|              343|
|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|2014-03-09 14:43:00|                87|SEA|SFO|              101|
|    1705|       45|    1839|       34|     WN| N360

Pergunta 11

In [43]:

df = df.withColumn(
    "sum",
    (
        F.unix_timestamp("dep_datetime") + (df.air_time * 60).cast("int")
    ).cast("timestamp"),
)


df = df.withColumn(
    "arr_time",
    F.when(
        df.arr_time == "NA",
        F.format_string("%d%d", F.hour(df.sum), F.minute(df.sum)),
    ).otherwise(df.arr_time)
)




df.show(2)


+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---+---+-----------------+-------------------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|       dep_datetime|air_time_projected| O1| D1|air_time_expected|                sum|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+-------------------+------------------+---+---+-----------------+-------------------+
|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|2014-12-08 06:58:00|               115|SEA|LAX|              126|2014-12-08 09:10:00|
|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|2014-01-22 10:40:00|               287|SEA|HNL|              343|2014-01-22 16:40:00|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+------------------

Pergunta 12

In [44]:
df = df.withColumn(
    "haul_duration",
    (
        F.when((df.air_time.between(20, 180)), "SHORT-HAUL")
        .when((df.air_time.between(181, 360)), "MEDIUM-HAUL")
        .otherwise("LONG-HAUL")
    ),
)


Pergunta 13

In [45]:
#errado
df = df.withColumn(
    "dep_season",
    (
        F.when(
            (
                df.dep_datetime.between(
                    "2013-12-21 21:48:00", "2014-03-20 15:32:59"
                )
            ),
            "WINTER",
        )
        .when(
            (
                df.dep_datetime.between(
                    "2014-03-20 15:33:00", "2014-06-21 10:13:59"
                )
            ),
            "SPRING",
        )
        .when(
            (
                df.dep_datetime.between(
                    "2014-06-21 10:14:00", "2014-09-23 02:03:59"
                )
            ),
            "SUMMER",
        )
        .when(
            (
                df.dep_datetime.between(
                    "2014-09-23 02:04:00", "2014-12-21 21:47:59"
                )
            ),
            "FALL",
        )
        .when(
            (
                df.dep_datetime.between(
                    "2014-12-21 21:48:00", "2015-03-20 15:32:59"
                )
            ),
            "WINTER",
        )
    ),
)
df.select("dep_season","dep_datetime").groupBy("dep_season").agg(F.min('dep_datetime').alias('StartDate'),
    F.max('dep_datetime').alias('EndDate')).show()


+----------+-------------------+-------------------+
|dep_season|          StartDate|            EndDate|
+----------+-------------------+-------------------+
|    WINTER|2014-01-01 00:00:00|2014-12-31 22:57:00|
|    SPRING|2014-03-20 15:38:00|2014-06-21 10:05:00|
|      FALL|2014-09-23 05:13:00|2014-12-21 18:34:00|
|    SUMMER|2014-06-21 11:02:00|2014-09-22 23:26:00|
+----------+-------------------+-------------------+



Pergunta 14

In [46]:
df = df.withColumn(
    "dep_delay_category",
    (
        F.when((df.dep_delay < 0), "ANTECIPATED")
        .when((df.dep_delay == 0), "INTIME")
        .when((df.dep_delay >= 60), "MAJOR")
        .otherwise("MINOR")
    ),
)
df.groupBy("dep_delay_category").count().distinct().show()

+------------------+-----+
|dep_delay_category|count|
+------------------+-----+
|       ANTECIPATED| 5894|
|             MAJOR|  395|
|             MINOR| 3065|
|            INTIME|  646|
+------------------+-----+



Salvando

In [47]:
"""
df.repartition(1).write.format("parquet").mode("overwrite").option(
    "header", "true"
).save("../Data/flights_proc.parquet")
"""
df = df.toPandas()
df.to_parquet("../Data/flights_proc.parquet")


  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
