Semana 01 - Agora que você já conhece os principais comandos do Apache Spark, vamos resolver um problema que simula uma situação do dia a dia de um Engenheiro/Cientista de Dados:

Digamos que seu chefe lhe peça para que faça um estudo sobre alguns voos comerciais e para isso ele lhe fornece o mesmo dataset que já utilizamos (airlines de 2008, lembra?). O que ele pede é o seguinte:

In [0]:
airlines = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema" , "true")\
                .load("/databricks-datasets/asa/airlines/2008.csv" )

In [0]:
# Para visualização dos dados do DataFrame
display(airlines)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003.0,1955,2211.0,2225,WN,335,N712SW,128.0,150,116.0,-14.0,8.0,IAD,TPA,810,4.0,8.0,0,,0,,,,,
2008,1,3,4,754.0,735,1002.0,1000,WN,3231,N772SW,128.0,145,113.0,2.0,19.0,IAD,TPA,810,5.0,10.0,0,,0,,,,,
2008,1,3,4,628.0,620,804.0,750,WN,448,N428WN,96.0,90,76.0,14.0,8.0,IND,BWI,515,3.0,17.0,0,,0,,,,,
2008,1,3,4,926.0,930,1054.0,1100,WN,1746,N612SW,88.0,90,78.0,-6.0,-4.0,IND,BWI,515,3.0,7.0,0,,0,,,,,
2008,1,3,4,1829.0,1755,1959.0,1925,WN,3920,N464WN,90.0,90,77.0,34.0,34.0,IND,BWI,515,3.0,10.0,0,,0,2.0,0.0,0.0,0.0,32.0
2008,1,3,4,1940.0,1915,2121.0,2110,WN,378,N726SW,101.0,115,87.0,11.0,25.0,IND,JAX,688,4.0,10.0,0,,0,,,,,
2008,1,3,4,1937.0,1830,2037.0,1940,WN,509,N763SW,240.0,250,230.0,57.0,67.0,IND,LAS,1591,3.0,7.0,0,,0,10.0,0.0,0.0,0.0,47.0
2008,1,3,4,1039.0,1040,1132.0,1150,WN,535,N428WN,233.0,250,219.0,-18.0,-1.0,IND,LAS,1591,7.0,7.0,0,,0,,,,,
2008,1,3,4,617.0,615,652.0,650,WN,11,N689SW,95.0,95,70.0,2.0,2.0,IND,MCI,451,6.0,19.0,0,,0,,,,,
2008,1,3,4,1620.0,1620,1639.0,1655,WN,810,N648SW,79.0,95,70.0,-16.0,0.0,IND,MCI,451,3.0,6.0,0,,0,,,,,


1) Uma tabela com a média de tempo de voo (ActualElapsedTime) e a média de atraso no embarque (DepDelay) separados por meses (Month).

In [0]:
from pyspark.sql.functions import avg

df = airlines.select('Month', 'ActualElapsedTime', 'DepDelay')

mean_per_month = df.groupBy('Month') \
                   .agg(avg('ActualElapsedTime').alias('avg_ActualElapsedTime'), \
                       avg('DepDelay').alias('avg_DepDelay'))

display(mean_per_month)


Month,avg_ActualElapsedTime,avg_DepDelay
1,128.29696319384124,11.47609595943289
2,129.44371714586094,13.706226305045202
3,129.63611339983922,12.49126948010275
5,126.87376545785284,7.642741440912969
4,127.13476376416128,8.201132754082797
6,128.9144641603606,13.609818079614008
8,127.036379525593,9.61475257451315
7,128.03412449648454,11.807544712497146
9,123.22679133130671,3.961818849518357
10,123.35473154721042,3.803487686795168


2) O número de voos que saem de Indianápolis e Las Vegas.

In [0]:
from pyspark.sql.functions import count

origins= ['IND','LAS']

amount_voo = airlines.select('Origin') \
             .filter(airlines.Origin.isin(origins)) \
             .groupBy('Origin') \
             .count()


In [0]:
amount_voo.show(truncate=False)

+------+------+
|Origin|count |
+------+------+
|LAS   |172876|
|IND   |42750 |
+------+------+



In [0]:
# Só para testar as diferentes formas de visualização 
display(amount_voo)

Origin,count
LAS,172876
IND,42750


3) O número de voos que saem de Indianápolis e chegam em Las Vegas

In [0]:
amount_voo_IND_LAS = airlines.select('Origin', 'Dest') \
                             .filter("Origin == 'IND' AND Dest == 'LAS'" ) \
                             .count()

In [0]:
display(f"Sairam {amount_voo_IND_LAS} voos de Indianápolis com destino à Las Vegas")

'Sairam 1675 voos de Indianápolis com destino à Las Vegas'