# **Configuração Inicial**

## Importação das bibliotecas

In [99]:
# Importação das bibliotecas e funções a serem utilizadas no códigos.
import random
import datetime as dt
import db_connector as db
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import explode, udf, arrays_zip, col
from pyspark.sql.types import IntegerType, ArrayType, StringType, TimestampType, DateType

# **Testes de Conexão**

## Testando Operação de Leitura

In [2]:
s = db.spark_conn()
df = s.read()
df.show()
s.stop_session()

+--------------------+--------+------------------+-----------+-------------+-------------------+--------------+---------+------------+--------------------+-------------------+--------------+-------------------+--------------------+--------------------------+-------------------------------+----------------------+-----------------+----------------------------+---------------------------------+------------------------+----------------+-------------------------+----------------------------+---------------+---------+-------------------+--------------+
|                 _id|baseFare|destinationAirport|elapsedDays|fareBasisCode|         flightDate|isBasicEconomy|isNonStop|isRefundable|               legId|         searchDate|seatsRemaining|segmentsAirlineCode| segmentsAirlineName|segmentsArrivalAirportCode|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsCabinCode|segmentsDepartureAirportCode|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsDistance|segmentsDur

## Testando Operação de Escrita

In [71]:
s = db.spark_conn()
df = s.read()

# Definindo o schema do novo registro.
schema = df.schema

# Definindo os dados para o novo registro.
data = [
    {
        "_id": "a1",
        "baseFare": 260.47,
        "destinationAirport": "BOS",
        "elapsedDays": 0,
        "fareBasisCode": "L0AJZNN1",
        "flightDate": datetime.fromisoformat("2022-04-17T00:00:00"),
        "isBasicEconomy": False,
        "isNonStop": False,
        "isRefundable": False,
        "legId": "721d9a2f66fe479e7c17b13e7ae0bb15",
        "searchDate": datetime.fromisoformat("2022-04-16T00:00:00"),
        "seatsRemaining": 1,
        "segmentsAirlineCode": "AA||AA",
        "segmentsAirlineName": "American Airlines||American Airlines",
        "segmentsArrivalAirportCode": "CLT||BOS",
        "segmentsArrivalTimeEpochSeconds": "1650212880||1650224280",
        "segmentsArrivalTimeRaw": "2022-04-17 12:28:00||2022-04-17 15:38:00",
        "segmentsCabinCode": None,
        "segmentsDepartureAirportCode": "ATL||CLT",
        "segmentsDepartureTimeEpochSeconds": "1650207720||1650216420",
        "segmentsDepartureTimeRaw": "2022-04-17 11:02:00||2022-04-17 13:27:00",
        "segmentsDistance": None,
        "segmentsDurationInSeconds": None,
        "segmentsEquipmentDescription": "Canadair Regional Jet 900||Airbus A321",
        "startingAirport": "ATL",
        "totalFare": 302.11,
        "totalTravelDistance": 956,
        "travelDuration": "PT4H36M"
    }
]

new_df = s.session.createDataFrame(data, schema=schema)

s.write(new_df)

s.stop_session()

Para remoção do registro adicionado anteriormente, porém, o uso deve ser no terminal mongosh do próprio mongoDB.

- `%use ProjetoPMD`
- `%db.itineraries.deleteOne({ _id: "a1" })`

# **Consulta 01**

## Enunciado

Partindo de um determinado aeroporto, com base nas informações coletadas nos 3 meses anteriores e num raio Y de milhas do aeroporto, listar a quantidade de voos para cada um dos aeroportos atendidos por ele, o preço médio da passagem e a duração média desses voos, além de mostrar quais foram as empresas aérea em operação.

## Execução

In [106]:
s = db.spark_conn()

df = s.read(columns=[
    "flightDate",
    "totalTravelDistance",
    "startingAirport",
    "destinationAirport",
    "totalFare",
    "travelDuration",
    "segmentsAirlineName"
    ])

df = df.limit(5)

df.write.parquet("tmp/q1.parquet",mode="overwrite")

s.stop_session()

In [128]:
s = db.spark_conn()

df = s.session.read.parquet("tmp/consulta1.parquet").dropna(how="any")

display(df.count())

93755

In [129]:
airport_list = df.select(df.startingAirport.alias("code")).distinct().collect()
start_airport = random.choice(airport_list).code
mile_range = 1500

today = dt.datetime.strptime("2022-10-05",'%Y-%m-%d').date()
margin = dt.timedelta(days=180)

date = udf(lambda x:  dt.datetime.strptime(x,'%Y-%m-%d').date(), DateType())
df = df.withColumn("flightDate", date(df.flightDate))

def time_to_seconds(x):
    try:
        aux = dt.datetime.strptime(x,'PT%HH%MM')
    except:
        aux = dt.datetime.strptime(x,'PT%HH')
    return aux.second + aux.minute*60 + aux.hour*3600
date = udf(lambda x:  time_to_seconds(x), IntegerType())
df = df.withColumn("travelDuration", date(df.travelDuration))

display(start_airport)

'MIA'

In [130]:
data = df.select(
        df.flightDate,
        df.totalTravelDistance,
        df.startingAirport,
        df.destinationAirport,
        df.totalFare,
        df.travelDuration,
        df.segmentsAirlineName
    ) \
    .filter(
        (df.startingAirport == start_airport) & (df.totalTravelDistance < mile_range) \
        & (today - margin <= df.flightDate) & (df.flightDate <= today + margin)
    )

data.show()

+----------+-------------------+---------------+------------------+---------+--------------+--------------------+
|flightDate|totalTravelDistance|startingAirport|destinationAirport|totalFare|travelDuration| segmentsAirlineName|
+----------+-------------------+---------------+------------------+---------+--------------+--------------------+
|2022-05-25|             1260.0|            MIA|               BOS|    188.6|         11940|     JetBlue Airways|
|2022-05-29|              719.0|            MIA|               CLT|    161.6|         20700|American Airlines...|
|2022-04-26|             1115.0|            MIA|               DFW|   168.61|         11400|   American Airlines|
|2022-05-24|             1104.0|            MIA|               EWR|    188.6|         11100|     JetBlue Airways|
|2022-05-08|             1016.0|            MIA|               PHL|   253.98|         10500|   Frontier Airlines|
|2022-04-29|             1192.0|            MIA|               ORD|    188.6|         12

In [131]:
data = data.groupBy("destinationAirport").agg(
    F.count("destinationAirport"),
    F.sum("totalFare"),
    F.sum("travelDuration")
    )

data = data.withColumn("avg_fare",col("sum(totalFare)")/col("count(destinationAirport)")) \
    .withColumn("avg_duration",str(dt.timedelta(seconds=col("sum(travelDuration)")/col("count(destinationAirport)")))) \
    .select(
        data.destinationAirport,
        col("count(destinationAirport)"),
        col("avg_fare"),
        col("avg_duration")
    ).sort("destinationAirport","count(destinationAirport)")

data.show()

+------------------+-------------------------+------------------+------------------+
|destinationAirport|count(destinationAirport)|          avg_fare|      avg_duration|
+------------------+-------------------------+------------------+------------------+
|               ATL|                      225|199.52204444444382|            9816.0|
|               BOS|                      504|300.45428571428727| 20992.02380952381|
|               CLT|                      286|251.68611888111857|17705.244755244756|
|               DEN|                        5|             549.9|           38412.0|
|               DFW|                      485|284.98175257732095|           21492.0|
|               DTW|                      156| 305.3426282051276| 20623.46153846154|
|               EWR|                      281|287.24476868327406|16002.491103202847|
|               IAD|                      252| 338.0209126984128|23740.714285714286|
|               JFK|                      373| 262.5239678284193|

# **Consulta 02**

## Enunciado

Listar o preço médio por distância viajada de cada empresa aérea, agrupando por intervalos de distância a cada 500 milhas e separando as viagens entre econômica e não econômica.

## Execução

In [2]:
s = db.spark_conn()

df = s.read(columns=[
    "segmentsAirlineName",
    "segmentsDistance",
    "isBasicEconomy",
    "totalFare"
    ])

df = df.limit(5)

df.write.parquet("tmp/q2.parquet",mode="overwrite")

s.stop_session()

In [14]:
s = db.spark_conn()

df = s.session.read.parquet("tmp/consulta2.parquet")

display(df.count())

100000

In [15]:
splitUDF = udf(lambda x: x.split("||") if x is not None else None, ArrayType(StringType()))

def categorize(x):
   if x is not None:
      if x == "true": return "Economy"
      else: return "Non-Economy"
   return "Not Informed"

categorizeUDF = udf(lambda x: categorize(x), StringType())


data = df.withColumn("tmp",arrays_zip(splitUDF(df.segmentsAirlineName),splitUDF(df.segmentsDistance))) \
   .withColumn("tmp", explode("tmp")) \
   .select(
      col("tmp.0").alias("airlines"),
      col("tmp.1").cast(IntegerType()).alias("distance"),
      df.isBasicEconomy,
      df.totalFare
   ).dropna(how="any")

data = data.withColumn("isBasicEconomy", categorizeUDF(df.isBasicEconomy))

data.show()

+-----------------+--------+--------------+---------+
|         airlines|distance|isBasicEconomy|totalFare|
+-----------------+--------+--------------+---------+
|           United|     351|   Non-Economy|    472.1|
|           United|    2566|   Non-Economy|    472.1|
|           United|     185|   Non-Economy|    472.1|
|Frontier Airlines|    1346|   Non-Economy|   123.98|
|           United|     221|   Non-Economy|    237.6|
|           United|     327|   Non-Economy|    237.6|
|  JetBlue Airways|     720|   Non-Economy|    126.6|
|  JetBlue Airways|    1104|   Non-Economy|    126.6|
|American Airlines|     327|   Non-Economy|    451.1|
|American Airlines|     545|   Non-Economy|    451.1|
|  Alaska Airlines|     672|   Non-Economy|    589.6|
|  Alaska Airlines|     956|   Non-Economy|    589.6|
|American Airlines|    2566|   Non-Economy|    181.6|
|           United|    2458|   Non-Economy|    398.6|
|American Airlines|     652|   Non-Economy|    226.1|
|American Airlines|     728|

In [16]:
bucketizer = Bucketizer(
    splits=[0,500,1000,1500,2000,2500,3000],
    inputCol='distance', 
    outputCol='bucket_value'
)

data = bucketizer.transform(data)

split_arr = bucketizer.getSplits()
format_udf = udf(lambda x:f'{int(split_arr[int(x)])}-{int(split_arr[int(x)+1])}',StringType())

data = data.withColumn('distance_bracket',format_udf('bucket_value'))
data.show()

+-----------------+--------+--------------+---------+------------+----------------+
|         airlines|distance|isBasicEconomy|totalFare|bucket_value|distance_bracket|
+-----------------+--------+--------------+---------+------------+----------------+
|           United|     351|   Non-Economy|    472.1|         0.0|           0-500|
|           United|    2566|   Non-Economy|    472.1|         5.0|       2500-3000|
|           United|     185|   Non-Economy|    472.1|         0.0|           0-500|
|Frontier Airlines|    1346|   Non-Economy|   123.98|         2.0|       1000-1500|
|           United|     221|   Non-Economy|    237.6|         0.0|           0-500|
|           United|     327|   Non-Economy|    237.6|         0.0|           0-500|
|  JetBlue Airways|     720|   Non-Economy|    126.6|         1.0|        500-1000|
|  JetBlue Airways|    1104|   Non-Economy|    126.6|         2.0|       1000-1500|
|American Airlines|     327|   Non-Economy|    451.1|         0.0|          

In [17]:
data = data.groupBy("airlines","distance_bracket","isBasicEconomy").sum("distance","totalFare")
data = data.withColumn("avg",col("sum(totalFare)")/col("sum(distance)")).select(
    data.airlines,
    data.distance_bracket,
    data.isBasicEconomy.alias("Class"),
    col("avg")
    ).sort("airlines","distance_bracket")
data.show()

+-----------------+----------------+-----------+-------------------+
|         airlines|distance_bracket|      Class|                avg|
+-----------------+----------------+-----------+-------------------+
|  Alaska Airlines|           0-500|Non-Economy| 1.9231913765436064|
|  Alaska Airlines|       1000-1500|Non-Economy| 0.6914865230818955|
|  Alaska Airlines|       1500-2000|Non-Economy| 0.4674941358192032|
|  Alaska Airlines|       2000-2500|Non-Economy| 0.2993224917426374|
|  Alaska Airlines|       2500-3000|Non-Economy|0.17886947802758577|
|  Alaska Airlines|        500-1000|Non-Economy| 0.8953961729973997|
|American Airlines|           0-500|Non-Economy| 0.8869010449701806|
|American Airlines|       1000-1500|Non-Economy|0.24894315118512392|
|American Airlines|       1500-2000|Non-Economy| 0.2353284502868487|
|American Airlines|       2000-2500|Non-Economy|0.18808732267770625|
|American Airlines|       2500-3000|Non-Economy|0.17167785285376908|
|American Airlines|        500-100

In [18]:
s.stop_session()