In [150]:
# For SQL-type queries (Spark)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark import SparkContext

# For regression and other possible ML tools (Spark)
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics

# Important for managing features  (Spark)
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler

# For displaying and other related IPython tools...
from IPython.display import display
from IPython.html.widgets import interact

# Typycal Python tools
import sys
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib.pyplot as plt
import os.path

# To show plots inline
%matplotlib inline

In [151]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
textFile = sc.textFile('2008.csv')



In [152]:
textFileRDD = textFile.map(lambda x: x.split(','))
header = textFileRDD.first()

textRDD = textFileRDD.filter(lambda r: r != header)

In [153]:
num_records = textFileRDD.count()
print ('Number of records ' , num_records)



Number of records  7009729


                                                                                

In [154]:
aux_ = textFileRDD.take(2)
feature_names = aux_[0]
feature_example = aux_[1]

In [155]:
print("Features names")
print(feature_names)

Features names
['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']


In [156]:
print("Feature example")
print(feature_example)

Feature example
['2008', '1', '3', '4', '2003', '1955', '2211', '2225', 'WN', '335', 'N712SW', '128', '150', '116', '-14', '8', 'IAD', 'TPA', '810', '4', '8', '0', '', '0', 'NA', 'NA', 'NA', 'NA', 'NA']


In [157]:
print("Number of features = " , len(feature_example))

Number of features =  29


In [158]:
def parse(x):
    try:
        y=Row(Year=int(x[0]),\
          Month=int(x[1]),\
          DayofMonth=int(x[2]),\
          DayOfWeek=int(x[3]),\
          DepTime=int(float(x[4])), \
          CRSDepTime=int(x[5]),\
          ArrTime=int(float(x[6])),\
          CRSArrTime=int(x[7]), \
          UniqueCarrier=x[8],\
          DepDelay=int(float(x[15])),\
          Origin=x[16],\
          Dest=x[17], \
          Distance=int(float(x[18])))  
    except:
        y=None  
    return y

In [159]:
rowRDD = textRDD.map(lambda x: parse(x)).filter(lambda x: x != None)
df = sqlContext.createDataFrame(rowRDD)

In [160]:
df = df.withColumn('DepDelayed', df['DepDelay']>15)

In [161]:
df.take(5)

[Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=2003, CRSDepTime=1955, ArrTime=2211, CRSArrTime=2225, UniqueCarrier='WN', DepDelay=8, Origin='IAD', Dest='TPA', Distance=810, DepDelayed=False),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=754, CRSDepTime=735, ArrTime=1002, CRSArrTime=1000, UniqueCarrier='WN', DepDelay=19, Origin='IAD', Dest='TPA', Distance=810, DepDelayed=True),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=628, CRSDepTime=620, ArrTime=804, CRSArrTime=750, UniqueCarrier='WN', DepDelay=8, Origin='IND', Dest='BWI', Distance=515, DepDelayed=False),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=926, CRSDepTime=930, ArrTime=1054, CRSArrTime=1100, UniqueCarrier='WN', DepDelay=-4, Origin='IND', Dest='BWI', Distance=515, DepDelayed=False),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=1829, CRSDepTime=1755, ArrTime=1959, CRSArrTime=1925, UniqueCarrier='WN', DepDelay=34, Origin='IND', Dest='BWI', Distance=515, 

In [162]:
# Function to obtain hour of day
def get_hour(x): 
    h = int(str(int(x)).zfill(4)[:2])
    return h

# Register our function as a UDF 
f = udf(get_hour, IntegerType())

In [163]:
#CRSDepTime: scheduled departure time (local, hhmm)
df = df.withColumn('Hour', f(df.CRSDepTime))
df.registerTempTable("airlineDF")



In [164]:
df.take(2)

[Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=2003, CRSDepTime=1955, ArrTime=2211, CRSArrTime=2225, UniqueCarrier='WN', DepDelay=8, Origin='IAD', Dest='TPA', Distance=810, DepDelayed=False, Hour=19),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=754, CRSDepTime=735, ArrTime=1002, CRSArrTime=1000, UniqueCarrier='WN', DepDelay=19, Origin='IAD', Dest='TPA', Distance=810, DepDelayed=True, Hour=7)]

Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [165]:
groupedDelay = sqlContext.sql("SELECT Origin, count(*) conFlight,avg(DepDelay) delay \
                                FROM airlineDF \
                                GROUP BY Origin")

# ... and turn it into a Padas data frame
df_origin = groupedDelay.toPandas()

                                                                                

In [166]:
df_origin.shape

(303, 3)

In [167]:
df_origin.head(10)

Unnamed: 0,Origin,conFlight,delay
0,MSY,38517,8.899759
1,GEG,15349,5.911395
2,SNA,46556,6.133237
3,BUR,30761,6.793797
4,GRB,7555,9.994044
5,GTF,2117,-0.491734
6,IDA,3038,2.46445
7,GRR,16117,7.818204
8,EUG,5652,5.474345
9,PVD,22063,10.251734


In [168]:
df_aux = pd.read_csv('airports.dat', index_col=0,\
names = ['name', 'city', 'country','IATA','ICAO','lat','lng','alt','TZone','DST','Tz','airports',"OurAirports"], \
            header=0)

In [169]:
df_aux=df_aux.drop(['airports'], axis=1)
df_aux=df_aux.drop(['OurAirports'], axis=1)
df_aux

Unnamed: 0,name,city,country,IATA,ICAO,lat,lng,alt,TZone,DST,Tz
2,Madang Airport,Madang,Papua New Guinea,MAG,AYMD,-5.207080,145.789001,20,10,U,Pacific/Port_Moresby
3,Mount Hagen Kagamuga Airport,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826790,144.296005,5388,10,U,Pacific/Port_Moresby
4,Nadzab Airport,Nadzab,Papua New Guinea,LAE,AYNZ,-6.569803,146.725977,239,10,U,Pacific/Port_Moresby
5,Port Moresby Jacksons International Airport,Port Moresby,Papua New Guinea,POM,AYPY,-9.443380,147.220001,146,10,U,Pacific/Port_Moresby
6,Wewak International Airport,Wewak,Papua New Guinea,WWK,AYWK,-3.583830,143.669006,19,10,U,Pacific/Port_Moresby
...,...,...,...,...,...,...,...,...,...,...,...
14106,Rogachyovo Air Base,Belaya,Russia,\N,ULDA,71.616699,52.478298,272,\N,\N,\N
14107,Ulan-Ude East Airport,Ulan Ude,Russia,\N,XIUW,51.849998,107.737999,1670,\N,\N,\N
14108,Krechevitsy Air Base,Novgorod,Russia,\N,ULLK,58.625000,31.385000,85,\N,\N,\N
14109,Desierto de Atacama Airport,Copiapo,Chile,CPO,SCAT,-27.261200,-70.779198,670,\N,\N,\N


In [170]:
df_airports = pd.merge(df_origin, df_aux, left_on = 'Origin', right_on = 'IATA')

In [171]:
df_airports.shape

(303, 14)

In [172]:
df_airports.head()

Unnamed: 0,Origin,conFlight,delay,name,city,country,IATA,ICAO,lat,lng,alt,TZone,DST,Tz
0,MSY,38517,8.899759,Louis Armstrong New Orleans International Airport,New Orleans,United States,MSY,KMSY,29.993401,-90.258003,4,-6,A,America/Chicago
1,GEG,15349,5.911395,Spokane International Airport,Spokane,United States,GEG,KGEG,47.6199,-117.533997,2376,-8,A,America/Los_Angeles
2,SNA,46556,6.133237,John Wayne Airport-Orange County Airport,Santa Ana,United States,SNA,KSNA,33.675701,-117.867996,56,-8,A,America/Los_Angeles
3,BUR,30761,6.793797,Bob Hope Airport,Burbank,United States,BUR,KBUR,34.200699,-118.359001,778,-8,A,America/Los_Angeles
4,GRB,7555,9.994044,Austin Straubel International Airport,Green Bay,United States,GRB,KGRB,44.4851,-88.129601,695,-6,A,America/Chicago


In [173]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def Zscore(x):
    return (x-np.average(x))/np.std(x)

In [177]:
conda install basemap

ValueError: The python kernel does not appear to be a conda environment.  Please use ``%pip install`` instead.

In [175]:
from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as plt
# rcParms allows setting size of the figure
from pylab import rcParams
%matplotlib inline

ModuleNotFoundError: No module named 'mpl_toolkits.basemap'

In [None]:
# For the map itself...


# We set size of the figure
rcParams['figure.figsize'] = (14,10)

# Set parameters for plotting the map
my_map = Basemap(projection='merc',
            resolution = 'l', area_thresh = 1000.0,
            llcrnrlon=-130,
                 llcrnrlat=22, #min longitude (llcrnrlon) and latitude (llcrnrlat)
                urcrnrlon=-60,
                 urcrnrlat=50) #max longitude (urcrnrlon) and latitude (urcrnrlat)

# Add features we want to show in the map...
my_map.drawcoastlines()
my_map.drawcountries()
my_map.drawmapboundary()
my_map.fillcontinents(color = 'white', alpha = 0.3)
my_map.shadedrelief()

# This line is to creat a colored map
colors = plt.get_cmap('hot')(np.linspace(0.0, 1.0, 30))
colors=np.flipud(colors)

# This set of instructions is used to genarate scatter plot in the map
countrange=max(df_airports['conFlight'])-min(df_airports['conFlight'])
# The following array normalizes the values in the 'delay' df_airports dataframe
# (assigns zscore to them) and determins its likelihood to be delayed by means
# of the sigmoid function.
al=np.array([sigmoid(x) for x in zscore(df_airports['delay'])])
xs,ys = my_map(np.asarray(df_airports['lng']), np.asarray(df_airports['lat']))
val=df_airports['conFlight']*4000.0/countrange

my_map.scatter(xs, ys,  marker='o', s= val, alpha = 0.8,
               color=colors[(al*20).astype(int)])

# Set of instructions to add text
df_text=df_airports[(df_airports['conFlight']>60000) &
                    (df_airports['IATA'] != 'HNL')]
xt,yt = my_map(np.asarray(df_text['lng']), np.asarray(df_text['lat']))
txt=np.asarray(df_text['IATA'])
zp=zip(xt,yt,txt)

for row in zp:
    plt.text(row[0],row[1],row[2], fontsize=10, color='blue',)

print("Each marker is an airport.")
print("Size of markers: Airport Traffic (larger means higher number of flights in year)")
print("Color of markers: Average Flight Delay (Redder means longer delays)")

plt.show()