In [1]:
sc

In [2]:
trafficpath = "C:\\Users\\Anshul\\Documents\\Pluralsight\\Spark\\Dodgers.data"
gamespath = "C:\\Users\\Anshul\\Documents\\Pluralsight\\Spark\\Dodgers.events"

In [3]:
traffic = sc.textFile(trafficpath)

In [6]:
traffic.take(5)

[u'4/10/2005 0:00,-1',
 u'4/10/2005 0:05,-1',
 u'4/10/2005 0:10,-1',
 u'4/10/2005 0:15,-1',
 u'4/10/2005 0:20,-1']

In [7]:
games = sc.textFile(gamespath)
games.take(5)

[u'04/12/05,13:10:00,16:23:00,55892,San Francisco,W 9-8\ufffd',
 u'04/13/05,19:10:00,21:48:00,46514,San Francisco,W 4-1\ufffd',
 u'04/15/05,19:40:00,21:48:00,51816,San Diego,W 4-0\ufffd',
 u'04/16/05,19:10:00,21:52:00,54704,San Diego,W 8-3\ufffd',
 u'04/17/05,13:10:00,15:31:00,53402,San Diego,W 6-0\ufffd']

In [8]:
from datetime import datetime
import csv
from StringIO import StringIO

def parseTraffic(row):
    DATE_FMT = "%m/%d/%Y %H:%M"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT)
    row[1] = int(row[1])
    
    return (row[0], row[1])

In [10]:
#creating paired RDD
trafficParsed = traffic.map(parseTraffic)
trafficParsed.take(5)

[(datetime.datetime(2005, 4, 10, 0, 0), -1),
 (datetime.datetime(2005, 4, 10, 0, 5), -1),
 (datetime.datetime(2005, 4, 10, 0, 10), -1),
 (datetime.datetime(2005, 4, 10, 0, 15), -1),
 (datetime.datetime(2005, 4, 10, 0, 20), -1)]

In [11]:
# Summarizing daily trends
dailyTrend = trafficParsed.map(lambda x : (x[0].date(), x[1]))\
.reduceByKey(lambda x,y : x + y)

In [12]:
dailyTrend.take(5)

[(datetime.date(2005, 8, 9), 5958),
 (datetime.date(2005, 6, 29), 5437),
 (datetime.date(2005, 8, 17), 6673),
 (datetime.date(2005, 9, 6), 6402),
 (datetime.date(2005, 5, 22), 4977)]

In [13]:
dailyTrend.sortBy(lambda x : -x[1]).take(5)

[(datetime.date(2005, 7, 28), 7661),
 (datetime.date(2005, 7, 29), 7499),
 (datetime.date(2005, 8, 12), 7287),
 (datetime.date(2005, 7, 27), 7238),
 (datetime.date(2005, 9, 23), 7175)]

In [14]:
# Joining with games

def parseGames(row):
    
    DATE_FMT = "%m/%d/%y"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT).date()
    
    return (row[0], row[4])

In [15]:
gamesParsed = games.map(parseGames)
gamesParsed.take(5)

[(datetime.date(2005, 4, 12), u'San Francisco'),
 (datetime.date(2005, 4, 13), u'San Francisco'),
 (datetime.date(2005, 4, 15), u'San Diego'),
 (datetime.date(2005, 4, 16), u'San Diego'),
 (datetime.date(2005, 4, 17), u'San Diego')]

In [17]:
dailyTrendCombined = dailyTrend.leftOuterJoin(gamesParsed)
dailyTrendCombined.take(5)

[(datetime.date(2005, 9, 24), (5848, u'Pittsburgh')),
 (datetime.date(2005, 8, 11), (7110, u'Philadelphia')),
 (datetime.date(2005, 6, 21), (5759, None)),
 (datetime.date(2005, 5, 24), (4138, None)),
 (datetime.date(2005, 6, 13), (5974, None))]

In [19]:
def checkGameDay(row):
    if row[1][1] == None:
        return (row[0], row[1][1], "Regular Day", row[1][0])
    else:
        return (row[0], row[1][1], "Game Day", row[1][0])

dailyTrendByGames = dailyTrendCombined.map(checkGameDay)
dailyTrendByGames.take(5)

[(datetime.date(2005, 9, 24), u'Pittsburgh', 'Game Day', 5848),
 (datetime.date(2005, 8, 11), u'Philadelphia', 'Game Day', 7110),
 (datetime.date(2005, 6, 21), None, 'Regular Day', 5759),
 (datetime.date(2005, 5, 24), None, 'Regular Day', 4138),
 (datetime.date(2005, 6, 13), None, 'Regular Day', 5974)]

In [21]:
dailyTrendByGames.sortBy(lambda x : -x[3]).take(10)

[(datetime.date(2005, 7, 28), u'Cincinnati', 'Game Day', 7661),
 (datetime.date(2005, 7, 29), u'St. Louis', 'Game Day', 7499),
 (datetime.date(2005, 8, 12), u'NY Mets', 'Game Day', 7287),
 (datetime.date(2005, 7, 27), u'Cincinnati', 'Game Day', 7238),
 (datetime.date(2005, 9, 23), u'Pittsburgh', 'Game Day', 7175),
 (datetime.date(2005, 7, 26), u'Cincinnati', 'Game Day', 7163),
 (datetime.date(2005, 5, 20), u'LA Angels', 'Game Day', 7119),
 (datetime.date(2005, 8, 11), u'Philadelphia', 'Game Day', 7110),
 (datetime.date(2005, 9, 8), None, 'Regular Day', 7107),
 (datetime.date(2005, 9, 7), u'San Francisco', 'Game Day', 7082)]

In [22]:
# Average on Game Day vs Non Game Day

dailyTrendByGames.map(lambda x : (x[2], x[3]))\
.combineByKey(lambda value : (value,1), \
             lambda acc, value : (value + acc[0], acc[1] + 1),\
             lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))\
.mapValues(lambda x : x[0] / x[1])\
.collect()

[('Game Day', 5948), ('Regular Day', 5411)]