## **Baixando, instalando e configurando o Apache Spark**

In [14]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [15]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# Torna o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [16]:
# iniciar uma sessão local 
from pyspark.sql import SparkSession
#sc = SparkSession.builder.master('local[*]').getOrCreate()
spark = SparkSession.builder.appName("Aula_Stack").getOrCreate()

In [24]:
# Verify SparkContext
print(spark)

# Print Spark version
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x7f724b5d8f50>
2.4.4


# **Importando bibliotecas**

In [90]:
from pyspark.sql.functions import col
import pandas as pd
import pyspark.sql.functions as F

# **Importando tabelas e fazendo queries**

In [36]:
from google.colab import files
files.upload()

Saving airports.csv to airports.csv
Saving flights_small.csv to flights_small.csv
Saving planes.csv to planes.csv


{'airports.csv': b'"faa","name","lat","lon","alt","tz","dst"\n"04G","Lansdowne Airport",41.1304722,-80.6195833,1044,-5,"A"\n"06A","Moton Field Municipal Airport",32.4605722,-85.6800278,264,-5,"A"\n"06C","Schaumburg Regional",41.9893408,-88.1012428,801,-6,"A"\n"06N","Randall Airport",41.431912,-74.3915611,523,-5,"A"\n"09J","Jekyll Island Airport",31.0744722,-81.4277778,11,-4,"A"\n"0A9","Elizabethton Municipal Airport",36.3712222,-82.1734167,1593,-4,"A"\n"0G6","Williams County Airport",41.4673056,-84.5067778,730,-5,"A"\n"0G7","Finger Lakes Regional Airport",42.8835647,-76.7812318,492,-5,"A"\n"0P2","Shoestring Aviation Airfield",39.7948244,-76.6471914,1000,-5,"U"\n"0S9","Jefferson County Intl",48.0538086,-122.8106436,108,-8,"A"\n"0W3","Harford County Airport",39.5668378,-76.2024028,409,-5,"A"\n"10C","Galt Field Airport",42.4028889,-88.3751111,875,-6,"U"\n"17G","Port Bucyrus-Crawford County Airport",40.7815556,-82.9748056,1003,-5,"A"\n"19A","Jackson County Airport",34.1758638,-83.5615972,9

In [37]:
#Importando arquivo e colocando dentro de um Dataframe
arquivo = "flights_small.csv"
flights = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [43]:
#Verificando o shape do pyspark dataframe
print((flights.count(), len(flights.columns)))

(10000, 16)


In [45]:
#Visualizando os 10 primeiros registros
flights.limit(10).toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,132,954,6,58
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,360,2677,10,40
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,111,679,14,43
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,83,569,17,5
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,127,937,7,54
5,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,121,991,10,37
6,2014,7,2,847,42,1041,51,WN,N422WN,1520,PDX,OAK,90,543,8,47
7,2014,5,12,1655,-5,1842,-18,VX,N361VA,755,SEA,SFO,98,679,16,55
8,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,135,1050,12,36
9,2014,11,19,1812,-3,2352,-4,AS,N564AS,26,SEA,ORD,198,1721,18,12


In [46]:
#Visualizando o schema do Dataframe
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [48]:
#Retirando as datas e passando colunas para tipos corretos.
flights = flights.\
        withColumn("new_air_time", col("air_time").cast("integer")).drop("air_time")

In [49]:
#renomeando colunas
flights = flights.withColumnRenamed("new_air_time","air_time")

In [120]:
#Registrando o dataframe em uma temp view
flights.createOrReplaceTempView("flights")

query = "SELECT * FROM flights LIMIT 10"

# Selecionando as 10 primeiras linhas do dataset
flights10 = spark.sql(query)

# Print o resultado
flights10.limit(10).toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time,duration_hrs
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132,2.2
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360,6.0
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111,1.85
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83,1.383333
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127,2.116667
5,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121,2.016667
6,2014,7,2,847,42,1041,51,WN,N422WN,1520,PDX,OAK,543,8,47,90,1.5
7,2014,5,12,1655,-5,1842,-18,VX,N361VA,755,SEA,SFO,679,16,55,98,1.633333
8,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135,2.25
9,2014,11,19,1812,-3,2352,-4,AS,N564AS,26,SEA,ORD,1721,18,12,198,3.3


In [53]:
#Incluindo a tempview criada dentro de um dataframe
sqlDF = spark.sql("SELECT * FROM flights LIMIT 10")
sqlDF.toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127
5,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121
6,2014,7,2,847,42,1041,51,WN,N422WN,1520,PDX,OAK,543,8,47,90
7,2014,5,12,1655,-5,1842,-18,VX,N361VA,755,SEA,SFO,679,16,55,98
8,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135
9,2014,11,19,1812,-3,2352,-4,AS,N564AS,26,SEA,ORD,1721,18,12,198


# **Trabalhando com Global Temp View**



In [56]:
# Registtrando o dataframe como view global
flights.createOrReplaceGlobalTempView("flights")

# A visão temporária global está vinculada a um banco de dados preservado pelo sistema `global_temp`
spark.sql("SELECT * FROM global_temp.flights LIMIT 10").toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127
5,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121
6,2014,7,2,847,42,1041,51,WN,N422WN,1520,PDX,OAK,543,8,47,90
7,2014,5,12,1655,-5,1842,-18,VX,N361VA,755,SEA,SFO,679,16,55,98
8,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135
9,2014,11,19,1812,-3,2352,-4,AS,N564AS,26,SEA,ORD,1721,18,12,198


# **Passando PySpark Dataframe para Pandas Dataframe**


In [57]:
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# Rodando a query
flight_counts = spark.sql(query)

In [60]:
# Convertendo o resultado para pandas
df = flights.toPandas()

In [61]:
df.head()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132.0
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360.0
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111.0
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83.0
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127.0


In [62]:
# Criando pandas dataframe lendo arquivo Airports
arq = "airports.csv"
pd_temp = pd.read_csv(arq)

In [63]:
pd_temp.head()

Unnamed: 0,faa,name,lat,lon,alt,tz,dst
0,04G,Lansdowne Airport,41.130472,-80.619583,1044,-5,A
1,06A,Moton Field Municipal Airport,32.460572,-85.680028,264,-5,A
2,06C,Schaumburg Regional,41.989341,-88.101243,801,-6,A
3,06N,Randall Airport,41.431912,-74.391561,523,-5,A
4,09J,Jekyll Island Airport,31.074472,-81.427778,11,-4,A


In [64]:
# Cria spark_temp a partir de pd_temp
spark_temp = spark.createDataFrame(pd_temp)

In [65]:
spark_temp.limit(5).toPandas()

Unnamed: 0,faa,name,lat,lon,alt,tz,dst
0,04G,Lansdowne Airport,41.130472,-80.619583,1044,-5,A
1,06A,Moton Field Municipal Airport,32.460572,-85.680028,264,-5,A
2,06C,Schaumburg Regional,41.989341,-88.101243,801,-6,A
3,06N,Randall Airport,41.431912,-74.391561,523,-5,A
4,09J,Jekyll Island Airport,31.074472,-81.427778,11,-4,A


# **Manipulando dados**


### **Adicionando colunas**


In [68]:
flights.select(flights.air_time/60).limit(5).toPandas()

Unnamed: 0,(air_time / 60)
0,2.2
1,6.0
2,1.85
3,1.383333
4,2.116667


In [70]:
flights.toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132.0
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360.0
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111.0
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83.0
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,2014,6,23,1806,-4,2104,-6,OO,N225AG,3458,SEA,SLC,689,18,6,89.0
9996,2014,8,31,2336,11,452,-13,AA,N3LEAA,1230,SEA,DFW,1660,23,36,178.0
9997,2014,8,8,904,-1,1042,-5,AS,N523AS,360,SEA,SMF,605,9,4,81.0
9998,2014,8,29,1441,26,1820,10,WN,N8647A,2857,SEA,ABQ,1180,14,41,133.0


In [73]:
# Adicionando coluna duration_hrs
flights = flights.withColumn("duration_hrs", flights.air_time/60)
flights.limit(10).toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time,duration_hrs
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132,2.2
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360,6.0
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111,1.85
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83,1.383333
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127,2.116667
5,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121,2.016667
6,2014,7,2,847,42,1041,51,WN,N422WN,1520,PDX,OAK,543,8,47,90,1.5
7,2014,5,12,1655,-5,1842,-18,VX,N361VA,755,SEA,SFO,679,16,55,98,1.633333
8,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135,2.25
9,2014,11,19,1812,-3,2352,-4,AS,N564AS,26,SEA,ORD,1721,18,12,198,3.3


## **Selecionando colunas**


In [75]:
# Selecionando um subconjunto do dataset
selected1 = flights.select("tailnum", "origin", "dest")
selected1.limit(5).toPandas()

Unnamed: 0,tailnum,origin,dest
0,N846VA,SEA,LAX
1,N559AS,SEA,HNL
2,N847VA,SEA,SFO
3,N360SW,PDX,SJC
4,N612AS,SEA,BUR


In [76]:
lista = ["tailnum", "origin", "dest"]
selected2 = flights.select(lista)
selected2.limit(5).toPandas()

Unnamed: 0,tailnum,origin,dest
0,N846VA,SEA,LAX
1,N559AS,SEA,HNL
2,N847VA,SEA,SFO
3,N360SW,PDX,SJC
4,N612AS,SEA,BUR


In [77]:
# Selecionando um subconjunto do dataset (outra maneira)
temp = flights.select(flights.origin, flights.dest, flights.carrier)
temp.limit(5).toPandas()

Unnamed: 0,origin,dest,carrier
0,SEA,LAX,VX
1,SEA,HNL,AS
2,SEA,SFO,VX
3,PDX,SJC,WN
4,SEA,BUR,AS


# **Filtrando dados**


In [78]:
flights.filter("air_time > 120").toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time,duration_hrs
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132,2.200000
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360,6.000000
2,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127,2.116667
3,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121,2.016667
4,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135,2.250000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5960,2014,9,15,1052,-7,1921,-1,DL,N723TW,1473,SEA,JFK,2422,10,52,282,4.700000
5961,2014,1,5,836,-9,1201,-12,US,N655AW,544,PDX,PHX,1009,8,36,122,2.033333
5962,2014,8,31,2336,11,452,-13,AA,N3LEAA,1230,SEA,DFW,1660,23,36,178,2.966667
5963,2014,8,29,1441,26,1820,10,WN,N8647A,2857,SEA,ABQ,1180,14,41,133,2.216667


In [79]:
flights.filter(flights.air_time > 120).limit(5).toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time,duration_hrs
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132,2.2
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360,6.0
2,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127,2.116667
3,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121,2.016667
4,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135,2.25


In [80]:
# Definindo o primeiro filtro
filterA = flights.origin == "SEA"

# Definindo o segundo filtro
filterB = flights.dest == "PDX"

# Filtrando os dados, primeiro pelo filterA entao pelo filterB
selected2 = selected2.filter(filterA).filter(filterB)

In [81]:
selected2.limit(10).toPandas()

Unnamed: 0,tailnum,origin,dest
0,N810SK,SEA,PDX
1,N822SK,SEA,PDX
2,N586SW,SEA,PDX
3,N223SW,SEA,PDX
4,N580SW,SEA,PDX
5,N520AS,SEA,PDX
6,N809SK,SEA,PDX
7,N295SW,SEA,PDX
8,N221SW,SEA,PDX
9,N294SW,SEA,PDX


# **Agregando dados**


In [83]:
# Encontre a duração do voo mais longo (em termos de tempo) que saiu do SEA usando o método .max().

# Achar a maior tempo de voo de SEA para outras cidades
flights.filter(flights.origin == "SEA").groupBy().max("duration_hrs").toPandas()

Unnamed: 0,max(duration_hrs)
0,6.816667


In [85]:
# Encontre a menor distância percorrida que saiu do PDX usando o método .min().

# Achar a menor distancia do voo de PDX para outras cidades
flights.filter(flights.origin == "PDX").groupBy().min("distance").toPandas()

Unnamed: 0,min(distance)
0,106


In [86]:
# Duração Média dos Voos da compania delta
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").toPandas()

Unnamed: 0,avg(air_time)
0,188.206897


In [87]:
# Tempo total em Horas no ar 
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").toPandas()

Unnamed: 0,sum(duration_hrs)
0,25289.6


In [88]:
# GroupBy por Mes e destino
by_month_dest = flights.groupBy("month", "dest")

In [89]:
# Desvio Padrão 
by_month_dest.agg(F.stddev("dep_delay")).toPandas()

Unnamed: 0,month,dest,stddev_samp(dep_delay)
0,4,PHX,15.003380
1,1,RDM,8.830750
2,5,ONT,18.895179
3,7,OMA,2.121320
4,8,MDW,14.467659
...,...,...,...
701,12,JFK,22.451159
702,12,DEN,20.529421
703,11,SMF,18.461672
704,10,STL,


# **Junção de dados (join)**


In [99]:
# Lendo o arquivo airports
arquivo = "airports.csv"
airports = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [92]:
# Visualizando os dados
airports.limit(10).toPandas()

Unnamed: 0,faa,name,lat,lon,alt,tz,dst
0,04G,Lansdowne Airport,41.130472,-80.619583,1044,-5,A
1,06A,Moton Field Municipal Airport,32.460572,-85.680028,264,-5,A
2,06C,Schaumburg Regional,41.989341,-88.101243,801,-6,A
3,06N,Randall Airport,41.431912,-74.391561,523,-5,A
4,09J,Jekyll Island Airport,31.074472,-81.427778,11,-4,A
5,0A9,Elizabethton Municipal Airport,36.371222,-82.173417,1593,-4,A
6,0G6,Williams County Airport,41.467306,-84.506778,730,-5,A
7,0G7,Finger Lakes Regional Airport,42.883565,-76.781232,492,-5,A
8,0P2,Shoestring Aviation Airfield,39.794824,-76.647191,1000,-5,U
9,0S9,Jefferson County Intl,48.053809,-122.810644,108,-8,A


In [116]:
# Contando as linhas do dataframe
airports.count()

1397

In [94]:
flights.limit(10).toPandas()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time,duration_hrs
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132,2.2
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360,6.0
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111,1.85
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83,1.383333
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127,2.116667
5,2014,1,15,1037,7,1352,2,WN,N646SW,48,PDX,DEN,991,10,37,121,2.016667
6,2014,7,2,847,42,1041,51,WN,N422WN,1520,PDX,OAK,543,8,47,90,1.5
7,2014,5,12,1655,-5,1842,-18,VX,N361VA,755,SEA,SFO,679,16,55,98,1.633333
8,2014,4,19,1236,-4,1508,-7,AS,N309AS,490,SEA,SAN,1050,12,36,135,2.25
9,2014,11,19,1812,-3,2352,-4,AS,N564AS,26,SEA,ORD,1721,18,12,198,3.3


In [117]:
# Contando as linhas do dataframe
flights.count()

10000

In [95]:
# Renomeie a coluna faa do dataframe airports
airports = airports.withColumnRenamed("faa", "dest")

In [97]:
# Juntando os DataFrames
flights_with_airports = flights.join(airports, on="dest", how="left")

In [100]:
# Lendo o novo dataframe
flights_with_airports.toPandas()

Unnamed: 0,dest,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,...,hour,minute,air_time,duration_hrs,name,lat,lon,alt,tz,dst
0,LAX,2014,12,8,658,-7,935,-5,VX,N846VA,...,6,58,132.0,2.200000,Los Angeles Intl,33.942536,-118.408075,126,-8,A
1,HNL,2014,1,22,1040,5,1505,5,AS,N559AS,...,10,40,360.0,6.000000,Honolulu Intl,21.318681,-157.922428,13,-10,N
2,SFO,2014,3,9,1443,-2,1652,2,VX,N847VA,...,14,43,111.0,1.850000,San Francisco Intl,37.618972,-122.374889,13,-8,A
3,SJC,2014,4,9,1705,45,1839,34,WN,N360SW,...,17,5,83.0,1.383333,Norman Y Mineta San Jose Intl,37.362600,-121.929022,62,-8,A
4,BUR,2014,3,9,754,-1,1015,1,AS,N612AS,...,7,54,127.0,2.116667,Bob Hope,34.200667,-118.358667,778,-8,A
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,SLC,2014,6,23,1806,-4,2104,-6,OO,N225AG,...,18,6,89.0,1.483333,Salt Lake City Intl,40.788389,-111.977772,4227,-7,A
9996,DFW,2014,8,31,2336,11,452,-13,AA,N3LEAA,...,23,36,178.0,2.966667,Dallas Fort Worth Intl,32.896828,-97.037997,607,-6,A
9997,SMF,2014,8,8,904,-1,1042,-5,AS,N523AS,...,9,4,81.0,1.350000,Sacramento Intl,38.695417,-121.590778,27,-8,A
9998,ABQ,2014,8,29,1441,26,1820,10,WN,N8647A,...,14,41,133.0,2.216667,Albuquerque International Sunport,35.040222,-106.609194,5355,-7,A


In [118]:
# Contando as linhas do dataframe
flights_with_airports.count()

10000

In [108]:
# Verificando o schema do novo dataframe
flights_with_airports.printSchema()

root
 |-- dest: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- duration_hrs: double (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



In [103]:
# Lendo o arquivo planes
arquivo = "planes.csv"
planes = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [105]:
planes.limit(10).toPandas()

Unnamed: 0,tailnum,year,type,manufacturer,model,engines,seats,speed,engine
0,N102UW,1998,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
1,N103US,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
2,N104UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
3,N105UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
4,N107US,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
5,N108UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
6,N109UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
7,N110UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
8,N111US,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
9,N11206,2000,Fixed wing multi engine,BOEING,737-824,2,149,,Turbo-fan


In [111]:
# Renomeando a coluna year para evitar ambiguidade
planes = planes.withColumnRenamed("year", "planes_year")

In [112]:
planes.limit(5).toPandas()

Unnamed: 0,tailnum,plaanes_year,type,manufacturer,model,engines,seats,speed,engine
0,N102UW,1998,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
1,N103US,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
2,N104UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
3,N105UW,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
4,N107US,1999,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan


In [113]:
# Juntando o Dataframe criado com o dataframe planes
flights_with_airports_with_planes = flights_with_airports.join(planes, on="tailnum", how="left")

In [115]:
# Lendo o dataframe criado
flights_with_airports_with_planes.limit(5).toPandas()

Unnamed: 0,tailnum,dest,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,...,tz,dst,plaanes_year,type,manufacturer,model,engines,seats,speed,engine
0,N846VA,LAX,2014,12,8,658,-7,935,-5,VX,...,-8,A,2011,Fixed wing multi engine,AIRBUS,A320-214,2,182,,Turbo-fan
1,N559AS,HNL,2014,1,22,1040,5,1505,5,AS,...,-10,N,2006,Fixed wing multi engine,BOEING,737-890,2,149,,Turbo-fan
2,N847VA,SFO,2014,3,9,1443,-2,1652,2,VX,...,-8,A,2011,Fixed wing multi engine,AIRBUS,A320-214,2,182,,Turbo-fan
3,N360SW,SJC,2014,4,9,1705,45,1839,34,WN,...,-8,A,1992,Fixed wing multi engine,BOEING,737-3H4,2,149,,Turbo-fan
4,N612AS,BUR,2014,3,9,754,-1,1015,1,AS,...,-8,A,1999,Fixed wing multi engine,BOEING,737-790,2,151,,Turbo-jet


In [119]:
# Contando as linhas do dataframe
flights_with_airports_with_planes.count()

10000