Skip to content

Commit

Permalink
Making PySpark code files available
Browse files Browse the repository at this point in the history
  • Loading branch information
ojedatony1616 committed Feb 3, 2015
1 parent 358bdbc commit a0d60b7
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 0 deletions.
Empty file.
109 changes: 109 additions & 0 deletions getting-started-with-spark/code/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
## Spark Application - execute with spark-submit

## Imports
import csv
import matplotlib.pyplot as plt

from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields)

## Closure Functions
def parse(row):
"""
Parses a row and returns a named tuple.
"""

row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])

def split(line):
"""
Operator function for splitting a line with csv module
"""
reader = csv.reader(StringIO(line))
return reader.next()

def plot(delays):
"""
Show a bar chart of the total delay per airline
"""
airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))

fig, axe = plt.subplots()
bars = axe.barh(index, minutes)

# Add the total minutes to the right
for idx, air, min in zip(index, airlines, minutes):
if min > 0:
bars[idx].set_color('#d9230f')
axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')

# Set the ticks
ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt, [' '] * len(xt))

#minimize chartjunk
plt.grid(axis = 'x', color ='white', linestyle='-')

plt.title('Total Minutes Delayed per Airline')
plt.show()

## Main functionality
def main(sc):

# Load the airlines lookup dictionary
airlines = dict(sc.textFile("data/ontime/airlines.csv").map(split).collect())

# Broadcast the lookup dictionary to the cluster
airline_lookup = sc.broadcast(airlines)

# Read the CSV Data into an RDD
flights = sc.textFile("data/ontime/flights.csv").map(split).map(parse)

# Map the total delay to the airline (joined using the broadcast value)
delays = flights.map(lambda f: (airline_lookup.value[f.airline],
add(f.dep_delay, f.arv_delay)))

# Reduce the total delay for the month to the airline
delays = delays.reduceByKey(add).collect()
delays = sorted(delays, key=itemgetter(1))

# Provide output from the driver
for d in delays:
print "%0.0f minutes delayed\t%s" % (d[1], d[0])

# Show a bar chart of the delays
plot(delays)

if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)

# Execute Main functionality
main(sc)
87 changes: 87 additions & 0 deletions getting-started-with-spark/code/munge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

import json
import unicodecsv as csv

INPATH = "data/delicious-rss-1250k.json"
OUTPATH = "data/feeds.csv"

def transform(inpath=INPATH, outpath=OUTPATH):
"""
Munges the data set and cleans it up for ease of use
"""
with open(inpath, 'rU') as f:
reader = csv.reader(f)
reader.next()

with open(outpath, 'w') as out:
writer = csv.writer(out)
for row in reader:
writer.writerow(row)

def json_transform(inpath=INPATH, outpath=OUTPATH):
"""
Munges JSON data
"""

with open(inpath, 'r') as f:
with open(outpath, 'w') as out:
writer = csv.writer(out)

for idx, line in enumerate(f):
data = json.loads(line)
url = data['links'][0]['href']
user = data['author']
title = data['title']
rowid = idx + 1
date = data['updated']

writer.writerow((rowid,user,title,url,date))

def munge_flight_data(inpath, outpath):
with open(inpath, 'r') as f:
reader = csv.reader(f)
blanks = 0
total = 0

with open(outpath, 'w') as o:
writer = csv.writer(o)

for row in reader:
total += 1
blank = False
## Check for blanks
for item in row[:11]:
if not item:
blank = True
break

if blank:
blanks += 1
continue

writer.writerow(row[:11])

print "%d blanks from %d total (%0.3f%%)" % (blanks, total, float(blanks)/float(total))

def time_helper(inpath, outpath):
"""
Convert time 2400 to 0000
"""

with open(inpath, 'r') as f:
reader = csv.reader(f)

with open(outpath, 'w') as o:
writer = csv.writer(o)

for row in reader:

# Time helper modification
for idx in (5, 7):
if row[idx] == '2400':
row[idx] = '0000'

writer.writerow(row)

if __name__ == "__main__":
time_helper('data/ontime/flights.csv', 'data/ontime/flights-cleaned.csv')

0 comments on commit a0d60b7

Please sign in to comment.