### Exploring Airport data

#### Loading the Airport codes data

In [20]:
import pandas as pd
import numpy as np

pd.set_option("display.max_columns", 500)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from unidecode import unidecode
import re
import datetime as dt

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
df_apc =  spark.read.options(delimiter=",", header=True) \
                    .csv("Cleaned Data/I94_ports.csv")

In [34]:
df_apc = df_apc.withColumn("province", F.col("state"))
df_apc = df_apc.drop("state")
df_apc = df_apc.withColumn("territory", F.col("country"))
df_apc = df_apc.drop("country")

In [36]:
df_apc.limit(5).toPandas()

Unnamed: 0,code,port,locality,province,territory
0,CLG,,Calgary,Alberta,Canada
1,EDA,,Edmonton,Alberta,Canada
2,YHC,,Hakai pass,British Columbia,Canada
3,HAL,,Halifax,Nova Scotia,Canada
4,MON,,Montreal,Quebec,Canada


#### Loading the Airport data

In [14]:
df_apd =  spark.read.options(delimiter="|", header=True, encoding="ISO-8859-1") \
                    .csv("Cleaned Data/Airports_Data/*.csv")

In [35]:
df_apd.limit(5).toPandas()

Unnamed: 0,ident,type,elevation_ft,continent,iso_country,iso_region,gps_code,iata_code,local_code,coordinates,nameL,municipalityL,nameE,municipalityE,country,state
0,00A,heliport,11,,US,US-PA,00A,,00A,"-74.93360137939453, 40.07080078125",Total Rf Heliport,Bensalem,Total Rf Heliport,Bensalem,United States,Pennsylvania
1,00AA,small_airport,3435,,US,US-KS,00AA,,00AA,"-101.473911, 38.704022",Aero B Ranch Airport,Leoti,Aero B Ranch Airport,Leoti,United States,Kansas
2,00AK,small_airport,450,,US,US-AK,00AK,,00AK,"-151.695999146, 59.94919968",Lowell Field,Anchor Point,Lowell Field,Anchor Point,United States,Alaska
3,00AL,small_airport,820,,US,US-AL,00AL,,00AL,"-86.77030181884766, 34.86479949951172",Epps Airpark,Harvest,Epps Airpark,Harvest,United States,Alabama
4,00AR,closed,237,,US,US-AR,,,,"-91.254898, 35.6087",Newport Hospital & Clinic Heliport,Newport,Newport Hospital & Clinic Heliport,Newport,United States,Arkansas


In [16]:
df_apd.filter(F.col("iso_country")=="MX").limit(5).toPandas()

Unnamed: 0,ident,type,elevation_ft,continent,iso_country,iso_region,gps_code,iata_code,local_code,coordinates,nameL,municipalityL,nameE,municipalityE,country,state
0,AMC,small_airport,71,,MX,MX-SON,MMPE,PPE,83550,"-113.305177, 31.351987",Mar de Cortés International Airport,Puerto Peñasco,Mar de Cortes International Airport,Puerto Penasco,Mexico,Sonora
1,BHL,small_airport,34,,MX,MX-BCN,,BHL,BAX,"-113.560997, 28.9786",Bahía de los Ángeles Airport,Bahía de los Ángeles,Bahia de los Angeles Airport,Bahia de los Angeles,Mexico,Baja California
2,BLM,small_airport,33,,MX,MX-BCN,,,BLM,"-113.528723717, 28.891952244799995",Bahia De Los Angelos South,,Bahia De Los Angelos South,,Mexico,Baja California
3,CYD,small_airport,575,,MX,MX-BCS,,,,"-112.8851, 27.2906",San Ignacio Downtown Airstrip,Mulegé,San Ignacio Downtown Airstrip,Mulege,Mexico,Baja California Sur
4,LOM,small_airport,6227,,MX,MX-JAL,,LOM,LMO,"-101.9441, 21.2581",Francisco Primo de Verdad y Ramos Airport,Lagos de Moreno,Francisco Primo de Verdad y Ramos Airport,Lagos de Moreno,Mexico,Jalisco


    Merging both the dataframes by the city. state, and country

In [37]:
cond = [df_apc.locality==df_apd.municipalityE, df_apc.province==df_apd.state, df_apc.territory==df_apd.country]
df_merged = df_apd.join(F.broadcast(df_apc), cond, "inner")

In [38]:
df_merged.limit(5).toPandas()

Unnamed: 0,ident,type,elevation_ft,continent,iso_country,iso_region,gps_code,iata_code,local_code,coordinates,nameL,municipalityL,nameE,municipalityE,country,state,code,port,locality,province,territory
0,CA-0005,closed,,,CA,CA-AB,,,,"-114.13300323486328, 51.0093994140625",RCAF Station Lincoln Park,Calgary,RCAF Station Lincoln Park,Calgary,Canada,Alberta,CLG,,Calgary,Alberta,Canada
1,CA-0088,small_airport,210.0,,CA,CA-ON,,,,"-75.29694366455078, 45.391666412353516",Navan Bearbrook Airport,Ottawa,Navan Bearbrook Airport,Ottawa,Canada,Ontario,OTT,,Ottawa,Ontario,Canada
2,CA-0125,closed,2225.0,,CA,CA-AB,,,FR3,"-113.233002, 53.583302",Bremner Airport,Edmonton,Bremner Airport,Edmonton,Canada,Alberta,EDA,,Edmonton,Alberta,Canada
3,CA-0126,closed,,,CA,CA-AB,,,,"-113.46700286865234, 53.68330001831055",Namao Airport,Edmonton,Namao Airport,Edmonton,Canada,Alberta,EDA,,Edmonton,Alberta,Canada
4,CA-0396,closed,,,CA,CA-BC,,,,"-122.933339775, 49.2691976954",Burnaby (Terminal) Heliport,Vancouver,Burnaby (Terminal) Heliport,Vancouver,Canada,British Columbia,VCV,,Vancouver,British Columbia,Canada


In [39]:
df_merged.count()

194

In [40]:
df_merged.columns

['ident',
 'type',
 'elevation_ft',
 'continent',
 'iso_country',
 'iso_region',
 'gps_code',
 'iata_code',
 'local_code',
 'coordinates',
 'nameL',
 'municipalityL',
 'nameE',
 'municipalityE',
 'country',
 'state',
 'code',
 'port',
 'locality',
 'province',
 'territory']

In [41]:
df_merged.filter(F.col("name")=="").limit(10).toPandas()

AnalysisException: "cannot resolve '`name`' given input columns: [municipalityL, port, municipalityE, nameE, type, state, iso_country, nameL, coordinates, iata_code, locality, country, code, territory, iso_region, ident, local_code, gps_code, continent, elevation_ft, province];;\n'Filter ('name = )\n+- Join Inner, (((locality#12 = municipalityE#148) && (province#609 = state#150)) && (territory#621 = country#149))\n   :- Relation[ident#135,type#136,elevation_ft#137,continent#138,iso_country#139,iso_region#140,gps_code#141,iata_code#142,local_code#143,coordinates#144,nameL#145,municipalityL#146,nameE#147,municipalityE#148,country#149,state#150] csv\n   +- ResolvedHint (broadcast)\n      +- Project [code#10, port#11, locality#12, province#609, territory#621]\n         +- Project [code#10, port#11, locality#12, country#14, province#609, country#14 AS territory#621]\n            +- Project [code#10, port#11, locality#12, country#14, province#609]\n               +- Project [code#10, port#11, locality#12, state#13, country#14, state#13 AS province#609]\n                  +- Relation[code#10,port#11,locality#12,state#13,country#14] csv\n"

In [43]:
df_merged.select("locality","state","country").distinct().count()

21