INSPER - Big Data e Computação em Nuvem

# Big Data e Computação em Nuvem - Checkpoint de aprendizagem

**Instruções**:
- Faça individualmente os exercícios abaixo para praticar conceitos visto nesta disciplina. 
- Quando pertinente, use os dados apontados no enunciado, já disponíveis na AWS. 
- A programação deve ser resolvida por meio de Spark.
- Para perguntas teóricas/conceituais, responda utilizando células markdown.
- Em caso de dúvidas, contate o Prof. Afonso. 

In [1]:
# Criar a sessao do Spark
from pyspark.sql import SparkSession
spark = SparkSession \
            .builder \
            .master("local[4]") \
            .appName("nyc_<mudar-nome>") \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.4,com.microsoft.azure:azure-storage:8.6.6") \
            .getOrCreate()

In [2]:
STORAGE_ACCOUNT = 'dlspadseastusprod'
CONTAINER = 'big-data-comp-nuvem'
FOLDER = 'airline-delay'
TOKEN = 'lSuH4ZI9BhOFEhCF/7ZQbrpPBIhgtLcPDfXjJ8lMxQZjaADW4p6tcmiZGDX9u05o7FqSE2t9d2RD+ASt0YFG8g=='

spark.conf.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)

# Questões

<font color=red>IMPORTANTE! Para efeitos de demonstração dos resultados, exiba as primeiras 10 linhas das suas RDDs ou Dataframes!</font>


## Spark RDDs

1. Responda com suas palavras: a) o que é uma RDD, b) qual sua importância em programação distribuída com Spark, e c) qual é a diferença entre uma _transformação_ e uma _ação_ sobre uma RDD?

2. A partir da leitura do arquivo `2009.csv`, utilize RDDs para compor uma variável com a seguinte frase: "OP_CARRIER_FL_NUM" on "FL_DATE" was / was not cancelled. Você deverá olhar os dados pertinentes para ajustar a frase dependendo do dado. 

3. Ainda utilizando o arquivo `2009.csv`, crie uma RDD que filtre apenas os voos cancelados, gerando uma string do tipo "Fight _NUMBER_ cancelled due to _CODE_ ". Lembre do dicionário: Reason for Cancellation of flight: A - Airline/Carrier; B - Weather; C - National Air System; D - Security

In [3]:
config = spark.sparkContext._jsc.hadoopConfiguration()
config.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)
sc = spark.sparkContext

rdd1 = sc.textFile("wasbs://{}@{}.blob.core.windows.net/{}/2009.csv".format(CONTAINER, STORAGE_ACCOUNT, FOLDER))
rdd1.take(2)

['FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27',
 '2009-01-01,XE,1204,DCA,EWR,1100,1058.0,-2.0,18.0,1116.0,1158.0,8.0,1202,1206.0,4.0,0.0,,0.0,62.0,68.0,42.0,199.0,,,,,,']

In [4]:
# 1.a) RDD significa "Resilient Distributed Dataset". É uma estrutura de dados fundamental do Spark que permite 
# armazenar e manipular dados distribuídos de forma eficiente. Os RDDs ajudam a garantir a tolerância a falhas. já 
# que se uma parte falhar, as outras partições replicadas ainda estão disponíveis para serem usadas.
# Além disso, as RDDs suportam operações de transformação e ação, que permitem aos usuários realizar tarefas de 
# processamento de dados de forma distribuída e paralela. 


# b) A importância da RDD na programação distribuída com o Spark é que ela permite que o Spark execute operações em
# grandes conjuntos de dados distribuídos em um cluster de máquinas de forma paralela e eficiente, reduzindo 
# significativamente o tempo de processamento. Além disso, as RDDs são resilientes, o que significa que elas podem 
# ser recriadas automaticamente em caso de falhas de uma ou mais máquinas do cluster, tornando o processamento 
# distribuído mais confiável.

# c)A principal diferença entre uma transformação e uma ação em uma RDD é que uma transformação é uma operação que 
# cria uma nova RDD a partir de uma RDD existente, sem executar imediatamente a computação. 
# Em outras palavras, uma transformação é uma operação preguiçosa (lazy), que não executa a computação 
# imediatamente, mas sim quando uma ação é chamada. Já uma ação é uma operação que executa a computação em uma RDD 
# e retorna um resultado ou escreve o resultado em um sistema de armazenamento externo. 
# As ações não criam uma nova RDD, elas apenas executam as computações necessárias para obter um resultado final.


In [5]:
# 2. A partir da leitura do arquivo `2009.csv`, utilize RDDs para compor uma variável com a seguinte frase
# : "OP_CARRIER_FL_NUM" on "FL_DATE" was / was not cancelled. 
# Você deverá olhar os dados pertinentes para ajustar a frase dependendo do dado. 

header = rdd1.first()

rdd2 = rdd1.filter(lambda x: x != header)

rdd3 = rdd2.map(lambda line: line.split(","))

rdd4 = rdd3.map(lambda x: x[2] + ' on ' + x[0] + (' was cancelled' if x[15] == '1' else ' was not cancelled'))

rdd4.take(2)


['1204 on 2009-01-01 was not cancelled',
 '1206 on 2009-01-01 was not cancelled']

In [6]:
# 3. Ainda utilizando o arquivo `2009.csv`, crie uma RDD que filtre apenas os voos cancelados, 
# gerando uma string do tipo "Fight _NUMBER_ cancelled due to _CODE_ ". 
# Lembre do dicionário: Reason for Cancellation of flight: A - Airline/Carrier; 
# B - Weather; C - National Air System; D - Security

rdd9 = rdd3.filter(lambda x: x[16] in ['A', 'B', 'C', 'D'])



rdd10 = rdd9.map(lambda x: ' Flight ' + x[2] + ' cancelled due to ' + ('Airline/Carrier' if x[16] == 'A' else
                  'Weather' if x[16] == 'B' else
                  'National Air System' if x[16] == 'C' else
                  'Security' if x[16] == 'D' else ' unknown reason '))


rdd10.take(10)

[' Flight 7104 cancelled due to Airline/Carrier',
 ' Flight 7329 cancelled due to Airline/Carrier',
 ' Flight 7065 cancelled due to Airline/Carrier',
 ' Flight 2984 cancelled due to Weather',
 ' Flight 2823 cancelled due to Weather',
 ' Flight 7344 cancelled due to Airline/Carrier',
 ' Flight 2798 cancelled due to Weather',
 ' Flight 2939 cancelled due to Weather',
 ' Flight 4537 cancelled due to Airline/Carrier',
 ' Flight 4537 cancelled due to Airline/Carrier']

## Spark Dataframe 

Utilizando o arquivo `2011.csv`, leia-o como um **Spark Dataframe** e faça:

4. Estratifique a distância entre aeroportos em 3 níveis: próximos, médio, distantes (faça os cortes nos valores da distância a seu critério, para isso, explore a base!). A partir daí, responda: qual é a proporção em % de voos com atraso em cada faixa?

5. Crie uma visualização com o número médio diário de vôos com origem do aeroporto BOS

In [7]:
df = spark.read.csv("wasbs://{}@{}.blob.core.windows.net/{}/2011.csv".format(CONTAINER, STORAGE_ACCOUNT, FOLDER), header=True, inferSchema=True)

In [8]:
df.show()

+-------------------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|            FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+-------------------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------

In [9]:
# 4. Estratifique a distância entre aeroportos em 3 níveis: próximos, médio, distantes (faça os cortes nos valores da distância a seu critério, 
# para isso, explore a base!). A partir daí, responda: qual é a proporção em % de voos com atraso em cada faixa?

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def classicar_distancia(x):
    if x <= 342.0:
        return "Próximos"
    elif x > 342.0 and x < 991.0:
        return "Médio"
    else:
        return "Distantes"
    
cudf = udf(classicar_distancia, StringType())

dfcat = df.withColumn("DIST_CAT", cudf(df["DISTANCE"]))

#dfcat.select('DEP_DELAY', 'ARR_DELAY', 'DISTANCE', 'DIST_CAT').show()


dfcat.groupBy("DIST_CAT").agg({"ARR_DELAY": "mean", "DEP_DELAY": "mean"}).show()


+---------+-----------------+-----------------+
| DIST_CAT|   avg(ARR_DELAY)|   avg(DEP_DELAY)|
+---------+-----------------+-----------------+
| Próximos|5.719378524269165|7.840633971146565|
|    Médio|4.922890708605792| 8.46082629586832|
|Distantes|3.524494624299961| 9.08029525198058|
+---------+-----------------+-----------------+



In [10]:
# 5. Crie uma visualização com o número médio diário de vôos com origem do aeroporto BOS


df10 = dfcat.filter(dfcat["ORIGIN"] == "BOS")


df10 = df10.filter(df10["CANCELLED"] == 0).select("ORIGIN", 'FL_DATE')

df10_grouped = df10.groupBy("FL_DATE").count().orderBy("FL_DATE")

df10_grouped.show()

from pyspark.sql.functions import avg

avg_flights = df10_grouped.agg(avg("count")).collect()[0][0]

print("O número médio diário de vôos com origem do aeroporto BOS:", avg_flights)



+-------------------+-----+
|            FL_DATE|count|
+-------------------+-----+
|2011-01-01 00:00:00|  214|
|2011-01-02 00:00:00|  262|
|2011-01-03 00:00:00|  281|
|2011-01-04 00:00:00|  304|
|2011-01-05 00:00:00|  300|
|2011-01-06 00:00:00|  317|
|2011-01-07 00:00:00|  304|
|2011-01-08 00:00:00|  204|
|2011-01-09 00:00:00|  274|
|2011-01-10 00:00:00|  286|
|2011-01-11 00:00:00|  245|
|2011-01-12 00:00:00|   22|
|2011-01-13 00:00:00|  284|
|2011-01-14 00:00:00|  320|
|2011-01-15 00:00:00|  203|
|2011-01-16 00:00:00|  270|
|2011-01-17 00:00:00|  307|
|2011-01-18 00:00:00|  250|
|2011-01-19 00:00:00|  297|
|2011-01-20 00:00:00|  313|
+-------------------+-----+
only showing top 20 rows

O número médio diário de vôos com origem do aeroporto BOS: 317.4834834834835


## Spark SQL 

Utilizando o arquivo `2011.csv`, responda utilizando **consultas SQL**: 

6. Quais são as operadoras mais pontuais, em média? Obs.: Considerar atraso total (saída e chegada)

In [11]:
!pip install duckdb



In [12]:
import duckdb
import pandas as pd
con = duckdb.connect(database=':memory:', read_only=False)

In [13]:
df20 = df.select('OP_CARRIER', 'DEP_DELAY', 'ARR_DELAY')


In [14]:
pandas_df = df20.toPandas()


In [47]:
query = """ 
SELECT OP_CARRIER, AVG(DEP_DELAY + ARR_DELAY) AS AVG_TOTAL_DELAY
FROM pandas_df
GROUP BY OP_CARRIER
ORDER BY AVG_TOTAL_DELAY ASC
LIMIT 10
"""
con.execute(query).df()


Unnamed: 0,OP_CARRIER,AVG_TOTAL_DELAY
0,AS,-1.713705
1,HA,0.047449
2,FL,6.433356
3,YV,6.92862
4,US,8.521306
5,DL,8.687339
6,UA,11.368903
7,OO,11.752063
8,MQ,14.014677
9,F9,14.390642


7. Qual aeroporto mais tem atrasos por questões de clima?

In [19]:
df21 = df.select('ORIGIN', 'DEST', 'WEATHER_DELAY')

pandas_df1 = df21.toPandas()


In [50]:
query = """ 
SELECT ORIGIN, SUM(WEATHER_DELAY) AS total_delay
FROM pandas_df1
WHERE WEATHER_DELAY IS NOT NULL
GROUP BY ORIGIN
ORDER BY total_delay DESC
LIMIT 10
"""
con.execute(query).df()



Unnamed: 0,ORIGIN,total_delay
0,ORD,286487.0
1,DFW,238270.0
2,ATL,197005.0
3,JFK,93227.0
4,DEN,84202.0
5,IAD,77772.0
6,EWR,73408.0
7,MIA,73170.0
8,CLT,69596.0
9,MCO,60946.0


In [53]:
print(" O aeroporto com mais atrasos por questões de clima é o Chicago O'Hare International Airport (ORD)")

 O aeroporto com mais atrasos por questões de clima é o Chicago O'Hare International Airport (ORD)
