In [1]:
sc

<pyspark.context.SparkContext at 0x1094d5fd0>

In [1]:
# set up the file paths for your data
airlinesPath='file:///Users/yiyingwang/desktop/ApacheSpark/udemy/data/airlines.csv'
airportsPath='file:///Users/yiyingwang/desktop/ApacheSpark/udemy/data/airports.csv'
flightsPath='file:///Users/yiyingwang/desktop/ApacheSpark/udemy/data/flights.csv'

In [None]:
airlines=sc.textFile(airlinesPath)

In [None]:
# airlines is an RDD
print airlines

In [None]:
# use the collect operation
airlines.collect()

In [None]:
airlines.first()

In [None]:
airlines.take(10)

In [None]:
airlines.count()

In [None]:
# filter out the header 
airlinesWoHeader = airlines.filter(lambda x: "Description" not in x)

In [None]:
print airlinesWoHeader

In [None]:
airlinesWoHeader.take(10)

In [None]:
airlinesPared=airlinesWoHeader.map(lambda x: x.split(",")).take(10)

In [None]:
airlinesPared

In [None]:
airlines.map(len).take(10)

In [16]:
def notHeader(row):
    return "Description" not in row

In [None]:
airlines.filter(notHeader).take(10)

In [None]:
# chain transformation together
airlines.filter(notHeader) \
    .map(lambda x: x.split(",")) \
    .take(10)


In [None]:
# use python libraries
import csv
from StringIO import StringIO

def split(line):
    reader = csv.reader(StringIO(line))
    return reader.next()

airlines.filter(notHeader).map(split).take(10)

In [2]:
flights=sc.textFile(flightsPath)

In [None]:
flights.count()

In [None]:
flights.take(10)
# flight date, airline code, flight num, source airport, destination airport, departure time, departure delay,
# arrival time, arrival delay, airtime, distance

In [None]:
# parse each row into a list
flightsParsed = flights.map(lambda x: x.split(','))

In [3]:
# set things up to reference these columns by name
# convert these fields to relevant data types from string
# set up a class to represent 1 record
# convert each list in flightsParsed to this class

from datetime import datetime
from collections import namedtuple

fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay',
         'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields, verbose=True)  # use namedtuple to manufacture a class, factory functions 
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

def parse(row): # parse the row list and return a Flight object
    row[0] = datetime.strptime(row[0], DATE_FMT).date()
    row[5] = datetime.strptime(row[5], TIME_FMT).time()
    row[6] = float(row[6])
    row[7] = datetime.strptime(row[7], TIME_FMT).time()
    row[8] = float(row[8])
    row[9] = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])

class Flight(tuple):
    'Flight(date, airline, flightnum, origin, dest, dep, dep_delay, arv, arv_delay, airtime, distance)'

    __slots__ = ()

    _fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')

    def __new__(_cls, date, airline, flightnum, origin, dest, dep, dep_delay, arv, arv_delay, airtime, distance):
        'Create new instance of Flight(date, airline, flightnum, origin, dest, dep, dep_delay, arv, arv_delay, airtime, distance)'
        return _tuple.__new__(_cls, (date, airline, flightnum, origin, dest, dep, dep_delay, arv, arv_delay, airtime, distance))

    @classmethod
    def _make(cls, iterable, new=tuple.__new__, len=len):
        'Make a new Flight object from a sequence or iterable'
        result = new(cls, iterable)
        if len(result) != 11:
            raise TypeError('Expected 11 arguments, got %d' % len(result))
        return result

    def __repr__(self):
        'Return a nicely

In [4]:
# process each row in the dataset
flightsParsed = flights.map(lambda x: x.split(",")).map(parse) # a copy of the function is sent to each node
# such functions are called closure functions. Spark is built on Scala, which supports the use of closure functions

In [None]:
flightsParsed.first()

In [None]:
# we can access the values in the Flight object using the field name
flightsParsed.map(lambda x: x.distance).first()

# Compute the average distance travelled by a flight

In [None]:
# compute the total distance travelled by all flights 
totalDistance = flightsParsed.map(lambda x: x.distance).reduce(lambda x,y: x+y)
# reduce takes a function that acts on two elements and returns an object of the same type
avgDistance = totalDistance/flightsParsed.count()
print avgDistance

# Compute the % of flights which had delays

In [7]:
# counting the number of flights with delays
flightsParsed.filter(lambda x: x.dep_delay>0).count()/float(flightsParsed.count())

0.3753871510922012

In [6]:
# most of computations need flightsParsed RDD
flightsParsed.persist()
# flightsParsed.unpersist()

PythonRDD[2] at RDD at PythonRDD.scala:43

# Compute the average delay

In [9]:
# aggregate used to combine multiple actions
sumCount=flightsParsed.map(lambda x:x.dep_delay).aggregate((0,0), # start from zero value
                                                           (lambda acc, value: (acc[0]+value, acc[1]+1)), # each node
                                                           (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))) 
                                                            # use on result of each node

In [11]:
sumCount[0]/sumCount[1]

8.313877046894298

# Compute frequency distribution of delays

In [12]:
# how many flights delay in 0-1 hrs, 1-2 hrs, 2-3 hrs, ...
flightsParsed.map(lambda x: int(x.dep_delay/60)).countByValue()

defaultdict(int,
            {0: 452963,
             1: 16016,
             2: 4893,
             3: 1729,
             4: 701,
             5: 249,
             6: 113,
             7: 66,
             8: 43,
             9: 26,
             10: 15,
             11: 12,
             12: 9,
             13: 15,
             14: 13,
             15: 4,
             17: 2,
             20: 4,
             21: 3,
             24: 3,
             25: 1,
             28: 1})

# Compute average delay per airport

In [5]:
# create a pair RDD with origin airport and delay for each flight
airportDelays = flightsParsed.map(lambda x: (x.origin, x.dep_delay))

In [6]:
airportDelays.keys().take(3)

[u'JFK', u'LAX', u'JFK']

In [8]:
airportTotalDelay = airportDelays.reduceByKey(lambda x,y:x+y)

In [9]:
airportCount = airportDelays.mapValues(lambda x:1).reduceByKey(lambda x, y:x+y)

In [10]:
airportSumCount = airportTotalDelay.join(airportCount)

In [11]:
airportAvgDelay = airportSumCount.mapValues(lambda x: x[0]/float(x[1]))

In [12]:
airportAvgDelay.take(5)

[(u'JFK', 6.997397769516729),
 (u'MIA', 3.820501853435985),
 (u'LIH', -0.9607623318385651),
 (u'LIT', 9.96943972835314),
 (u'CLT', 8.066368381804624)]

In [13]:
airportSumCount2=airportDelays.combineByKey((lambda value:(value, 1)),
                                            (lambda acc, value: (acc[0]+value, acc[1]+1)),
                                            (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))
                                            )

# the top 10 airports based on delay

In [15]:
# sort airportAvgDelay in descending order
airportAvgDelay.sortBy(lambda x: -x[1]).take(10)

[(u'PPG', 56.25),
 (u'EGE', 32.0),
 (u'OTH', 24.533333333333335),
 (u'LAR', 18.892857142857142),
 (u'RDD', 18.55294117647059),
 (u'MTJ', 18.363636363636363),
 (u'PUB', 17.54),
 (u'EWR', 16.478549005929544),
 (u'CIC', 15.931034482758621),
 (u'RST', 15.6993006993007)]

In [17]:
import csv
from StringIO import StringIO

def split(line):
    reader = csv.reader(StringIO(line))
    return reader.next()

def notHeader(row):
    return "Description" not in row

airports = sc.textFile(airportsPath).filter(notHeader).map(split)

In [18]:
# lookup action used to pair RDDs
airports.lookup('PPG')

['Pago Pago, TT: Pago Pago International']

In [19]:
# dictionary
# we can build a map with all airports from the RDD and use that, collectAsMap is an action
airportLookup=airports.collectAsMap()

In [20]:
airportAvgDelay.map(lambda x: (airportLookup[x[0]], x[1])).take(5)

[('New York, NY: John F. Kennedy International', 6.997397769516729),
 ('Miami, FL: Miami International', 3.820501853435985),
 ('Lihue, HI: Lihue Airport', -0.9607623318385651),
 ('Little Rock, AR: Bill and Hillary Clinton Nat Adams Field',
  9.96943972835314),
 ('Charlotte, NC: Charlotte Douglas International', 8.066368381804624)]

In [21]:
airportAvgDelay.sortBy(lambda x: -x[1]).map(lambda x: (airportLookup[x[0]], x[1])).take(10)

[('Pago Pago, TT: Pago Pago International', 56.25),
 ('Eagle, CO: Eagle County Regional', 32.0),
 ('North Bend/Coos Bay, OR: Southwest Oregon Regional', 24.533333333333335),
 ('Laramie, WY: Laramie Regional', 18.892857142857142),
 ('Redding, CA: Redding Municipal', 18.55294117647059),
 ('Montrose/Delta, CO: Montrose Regional', 18.363636363636363),
 ('Pueblo, CO: Pueblo Memorial', 17.54),
 ('Newark, NJ: Newark Liberty International', 16.478549005929544),
 ('Chico, CA: Chico Municipal', 15.931034482758621),
 ('Rochester, MN: Rochester International', 15.6993006993007)]

In [None]:
# a copy of airportLookup variable is carried over to each node in the cluster
# sometimes, we might need to use a variable for many operations, what if we can cache this variable on each of the
# node  -- > broadcast variable
# 1. immutable 2. distributed 3. in-memory
airportBC = sc.broadcast(airportLookup)
