In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1515736974360_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fd0985a87d0>

## Dodgers loop sensor example

Download the dataset for the Dodgers traffic from the link https://archive.ics.uci.edu/ml/datasets/dodgers+loop+sensor

From the above link we land to the pager -> https://archive.ics.uci.edu/ml/machine-learning-databases/event-detection/
where we will download two files 'Dodgers.data" & "Dodgers.events"

### Dataset information 

Copied from the above website

This loop sensor data was collected for the Glendale on ramp for the 101 North freeway in Los Angeles. It is close enough to the stadium to see unusual traffic after a Dodgers game, but not so close and heavily used by game traffic so that the signal for the extra traffic is overly obvious. 

NOTE: This is an on ramp near the stadium so event traffic BEGINS at or near the END of the event time. 

The observations were taken over 25 weeks, 288 time slices per day (5 minute count aggregates). 

The goal is to predict the presence of a baseball game at Dodgers stadium

Attribute Information:

1. Date: MM/DD/YY 
2. Time: (H)H:MM (military time) 
3. Count: Number of cars measured for the previous five minutes 
Rows: Each five minute time slice is represented by one row 

For .events file: 
1. Date: MM/DD/YY 
2. Begin event time: HH:MM:SS (military) 
3. End event time: HH:MM:SS (military) 
4. Game attendance 
5. Away team 
6. W/L score



In [2]:
# Start with specifying where the files live in the Azure blob storage (default storage account mapped to the cluster)
trafficPath = 'wasb:///example/data/Dodgers.data'
gamesPath = 'wasb:///example/data/Dodgers.events'

In [3]:
# load and see the traffic dataset
traffic = sc.textFile(trafficPath)
traffic.take(5) # Each row represens a 5 minute slice of time and no of cars that passed in that time slice

[u'4/10/2005 0:00,-1', u'4/10/2005 0:05,-1', u'4/10/2005 0:10,-1', u'4/10/2005 0:15,-1', u'4/10/2005 0:20,-1']

In [4]:
# Load the events file and take a look at it
games = sc.textFile(gamesPath)
games.take(10) # refer to the info on what it contains (Date,Start, End times, #spectators, Away team, WL score)

[u'04/12/05,13:10:00,16:23:00,55892,San Francisco,W 9-8\ufffd', u'04/13/05,19:10:00,21:48:00,46514,San Francisco,W 4-1\ufffd', u'04/15/05,19:40:00,21:48:00,51816,San Diego,W 4-0\ufffd', u'04/16/05,19:10:00,21:52:00,54704,San Diego,W 8-3\ufffd', u'04/17/05,13:10:00,15:31:00,53402,San Diego,W 6-0\ufffd', u'04/25/05,19:10:00,21:33:00,36876,Arizona,L 4-2\ufffd', u'04/26/05,19:10:00,22:00:00,44486,Arizona,L 3-2\ufffd', u'04/27/05,19:10:00,22:17:00,54387,Arizona,L 6-3\ufffd', u'04/29/05,19:40:00,22:01:00,40150,Colorado,W 6-3\ufffd', u'04/30/05,19:10:00,21:45:00,54123,Colorado,W 6-2\ufffd']

In [6]:
from datetime import datetime
import csv
from StringIO import StringIO

# function to parse the traffic data 
def parseTraffic(row):
    DATE_FMT = "%m/%d/%Y %H:%M"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT) # parse the date to get the object
    row[1] = int(row[1])
    return (row[0], row[1])

In [7]:
# create a pair RDD , create a python tuple (date, #cars)
trafficParsed = traffic.map(parseTraffic)

In [8]:
trafficParsed.first()

(datetime.datetime(2005, 4, 10, 0, 0), -1)

In [18]:
trafficParsed.values().take(500) # why there are -1 values? not sure :O

[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,

## Computing a daily trend

In [10]:
# How many cars are passing by the stadium?
# Summarizing a pair RDD
dailyTrend = trafficParsed.map(lambda x: (x[0].date(), x[1])).reduceByKey(lambda x,y: x+y)

In [11]:
dailyTrend.take(5)

[(datetime.date(2005, 8, 9), 5958), (datetime.date(2005, 6, 29), 5437), (datetime.date(2005, 8, 17), 6673), (datetime.date(2005, 9, 6), 6402), (datetime.date(2005, 5, 22), 4977)]

In [14]:
# Sort the data by values
dailyTrend.sortBy(lambda x: -x[1]).take(10) # -x[1] specify for the descending order
# Below shows the dates with highest amount of traffic

[(datetime.date(2005, 7, 28), 7661), (datetime.date(2005, 7, 29), 7499), (datetime.date(2005, 8, 12), 7287), (datetime.date(2005, 7, 27), 7238), (datetime.date(2005, 9, 23), 7175), (datetime.date(2005, 7, 26), 7163), (datetime.date(2005, 5, 20), 7119), (datetime.date(2005, 8, 11), 7110), (datetime.date(2005, 9, 8), 7107), (datetime.date(2005, 9, 7), 7082)]

**It would be interesting to correlate the dates with highest no of traffic with the days there was a Dodgers game.**

 For the above we need to merge this data set with the games dataset
 
 Merging pair RDDs, similar to join operation. Spark allows this merging on the key.

 3 types of join in spark - join, leftOuterJoin , rightOuterJoin
- Join - returns a new pair RDD, values whose keys match are grouped together. Key dropped if not present in both RDDs.
- leftOuterJoin - All keys from the left RDD are returned, matching keys in the right RDD are grouped.
- rightOuterJoin - reverse of the above leftOuterJoin.


In [19]:
# Merging with the games data
# First need to parse the games data to return a tuple
def parseGames(row):
    DATE_FMT = "%m/%d/%y"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT).date()
    return (row[0], row[4]) # date and opponent team tuple

gamesParsed = games.map(parseGames)
gamesParsed.take(10)

[(datetime.date(2005, 4, 12), u'San Francisco'), (datetime.date(2005, 4, 13), u'San Francisco'), (datetime.date(2005, 4, 15), u'San Diego'), (datetime.date(2005, 4, 16), u'San Diego'), (datetime.date(2005, 4, 17), u'San Diego'), (datetime.date(2005, 4, 25), u'Arizona'), (datetime.date(2005, 4, 26), u'Arizona'), (datetime.date(2005, 4, 27), u'Arizona'), (datetime.date(2005, 4, 29), u'Colorado'), (datetime.date(2005, 4, 30), u'Colorado')]

In [20]:
# join this with the original RDD leftOuterJoin, reason is we want to keep all the dates in our dataset
# whether or not there was a game that day
dailyTrendCombined = dailyTrend.leftOuterJoin(gamesParsed)
dailyTrendCombined.take(10) # note that the second element is now a tuple with values from both the RDD

[(datetime.date(2005, 9, 24), (5848, u'Pittsburgh')), (datetime.date(2005, 8, 11), (7110, u'Philadelphia')), (datetime.date(2005, 6, 21), (5759, None)), (datetime.date(2005, 5, 24), (4138, None)), (datetime.date(2005, 6, 13), (5974, None)), (datetime.date(2005, 7, 18), (5994, None)), (datetime.date(2005, 4, 23), (5366, None)), (datetime.date(2005, 6, 29), (5437, u'San Diego')), (datetime.date(2005, 8, 15), (5329, None)), (datetime.date(2005, 6, 1), (6520, u'Chicago Cubs'))]

In [21]:
# helper function to report game or regular day
def checkGameDay(row):
    if row[1][1] == None: # if this is empty that means no game
        return (row[0], row[1][1], "Regular Day", row[1][0])
    else:
        return (row[0], row[1][1], "Game Day", row[1][0])
    
# Apply the above function to the trend combined RDD
dailyTrendbyGames = dailyTrendCombined.map(checkGameDay)

In [22]:
dailyTrendbyGames.take(5)

[(datetime.date(2005, 9, 24), u'Pittsburgh', 'Game Day', 5848), (datetime.date(2005, 8, 11), u'Philadelphia', 'Game Day', 7110), (datetime.date(2005, 6, 21), None, 'Regular Day', 5759), (datetime.date(2005, 5, 24), None, 'Regular Day', 4138), (datetime.date(2005, 6, 13), None, 'Regular Day', 5974)]

In [23]:
dailyTrendbyGames.sortBy(lambda x:-x[3]).take(10) # now we sort based on the decreasing car count and get top 10 records

[(datetime.date(2005, 7, 28), u'Cincinnati', 'Game Day', 7661), (datetime.date(2005, 7, 29), u'St. Louis', 'Game Day', 7499), (datetime.date(2005, 8, 12), u'NY Mets', 'Game Day', 7287), (datetime.date(2005, 7, 27), u'Cincinnati', 'Game Day', 7238), (datetime.date(2005, 9, 23), u'Pittsburgh', 'Game Day', 7175), (datetime.date(2005, 7, 26), u'Cincinnati', 'Game Day', 7163), (datetime.date(2005, 5, 20), u'LA Angels', 'Game Day', 7119), (datetime.date(2005, 8, 11), u'Philadelphia', 'Game Day', 7110), (datetime.date(2005, 9, 8), None, 'Regular Day', 7107), (datetime.date(2005, 9, 7), u'San Francisco', 'Game Day', 7082)]