In [5]:
from init_SparkContext import init_SparkContext

In [6]:
sc, spark = init_SparkContext(appName="DodgersSummary")

Spark found in your system !!
Spark Context and Spark session initialized !!


In [7]:
trafficPath = "D:\\Big Data\\Notebook\\data\\Dodgers.data"
gamesPath = "D:\\Big Data\\Notebook\\data\\Dodgers.events"

In [10]:
traffic = sc.textFile(trafficPath)
traffic.take(10)

['4/10/2005 0:00,-1',
 '4/10/2005 0:05,-1',
 '4/10/2005 0:10,-1',
 '4/10/2005 0:15,-1',
 '4/10/2005 0:20,-1',
 '4/10/2005 0:25,-1',
 '4/10/2005 0:30,-1',
 '4/10/2005 0:35,-1',
 '4/10/2005 0:40,-1',
 '4/10/2005 0:45,-1']

- Each row represents a five-minute slice of time,
- and the second column in the dataset represents the number of cars that passed by in that five-minute slice of time.

In [11]:
games = sc.textFile(gamesPath)
games.take(10)

['04/12/05,13:10:00,16:23:00,55892,San Francisco,W 9-8�',
 '04/13/05,19:10:00,21:48:00,46514,San Francisco,W 4-1�',
 '04/15/05,19:40:00,21:48:00,51816,San Diego,W 4-0�',
 '04/16/05,19:10:00,21:52:00,54704,San Diego,W 8-3�',
 '04/17/05,13:10:00,15:31:00,53402,San Diego,W 6-0�',
 '04/25/05,19:10:00,21:33:00,36876,Arizona,L 4-2�',
 '04/26/05,19:10:00,22:00:00,44486,Arizona,L 3-2�',
 '04/27/05,19:10:00,22:17:00,54387,Arizona,L 6-3�',
 '04/29/05,19:40:00,22:01:00,40150,Colorado,W 6-3�',
 '04/30/05,19:10:00,21:45:00,54123,Colorado,W 6-2�']

- It has the start and end time for a game,
- the number of folks who actually attended that game,
- the opponent that the dodgers played on that day,
- and the win/loss infomation along with the score.

- Now we will start by taking the traffic RDD and pssing it to create a pair RDD.
- For a pair RDD we need to have each row or each record represented as a tuple with two elements.

- So we will set up a function that can take in a string and extract out two elements from it: the date or the five-minute time slice, and a number representing the traffic that passed by in that five-minute time slice.

In [17]:
from datetime import datetime
import csv
from io import StringIO

def parseTraffic(row):
    DATE_FMT = "%m/%d/%Y %H:%M"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT)
    row[1] = int(row[1])
    return (row[0], row[1])

In [18]:
trafficParsed = traffic.map(parseTraffic)

In [19]:
trafficParsed.take(10)

[(datetime.datetime(2005, 4, 10, 0, 0), -1),
 (datetime.datetime(2005, 4, 10, 0, 5), -1),
 (datetime.datetime(2005, 4, 10, 0, 10), -1),
 (datetime.datetime(2005, 4, 10, 0, 15), -1),
 (datetime.datetime(2005, 4, 10, 0, 20), -1),
 (datetime.datetime(2005, 4, 10, 0, 25), -1),
 (datetime.datetime(2005, 4, 10, 0, 30), -1),
 (datetime.datetime(2005, 4, 10, 0, 35), -1),
 (datetime.datetime(2005, 4, 10, 0, 40), -1),
 (datetime.datetime(2005, 4, 10, 0, 45), -1)]

- The result is a pair RDD in which each record is tuple

### Summarizing Pair RDDs

#### reduceByKey

- reduceByKey is similar to the reduce operation that we have seen earlier,
- it is used to combine records with the same key in a specified way: Sum, Maximum, Minimum
- it need function with 2 arguments
- unlike reduce, reduceByKey will not combine the complete record
- only combine values with the same key
- The operation is a transformation

### Computing a daily trend

In [21]:
trafficParsed.take(10)

[(datetime.datetime(2005, 4, 10, 0, 0), -1),
 (datetime.datetime(2005, 4, 10, 0, 5), -1),
 (datetime.datetime(2005, 4, 10, 0, 10), -1),
 (datetime.datetime(2005, 4, 10, 0, 15), -1),
 (datetime.datetime(2005, 4, 10, 0, 20), -1),
 (datetime.datetime(2005, 4, 10, 0, 25), -1),
 (datetime.datetime(2005, 4, 10, 0, 30), -1),
 (datetime.datetime(2005, 4, 10, 0, 35), -1),
 (datetime.datetime(2005, 4, 10, 0, 40), -1),
 (datetime.datetime(2005, 4, 10, 0, 45), -1)]

- From this RDD we want to go to another RDD which has the date as the key and the total number of cars which passed by on that day as the value.

- To do this we first use a map operation that takes the traffic parsed RDD and takes the timestamp which is present in the key and converts it to just the date of that timestamp.

In [23]:
# summarizing a pair RDD
dailyTrend = trafficParsed.map(lambda x: (x[0].date(), x[1]))\
                          .reduceByKey(lambda x,y:x+y)

In [25]:
dailyTrend.take(10)

[(datetime.date(2005, 4, 10), -288),
 (datetime.date(2005, 4, 11), 5062),
 (datetime.date(2005, 4, 12), 6822),
 (datetime.date(2005, 4, 13), 6742),
 (datetime.date(2005, 4, 14), 6423),
 (datetime.date(2005, 4, 15), 6459),
 (datetime.date(2005, 4, 16), 6002),
 (datetime.date(2005, 4, 17), 5322),
 (datetime.date(2005, 4, 18), 5600),
 (datetime.date(2005, 4, 19), 6049)]

- and this reduceByKey operation will return an RDD in which each record is the date and the value is the total amount of traffic that is passed by the dodgers Stadium on that date.

In [26]:
dailyTrend.sortBy(lambda x:-x[1]).take(10)   #-x[1] to specify descending order

[(datetime.date(2005, 7, 28), 7661),
 (datetime.date(2005, 7, 29), 7499),
 (datetime.date(2005, 8, 12), 7287),
 (datetime.date(2005, 7, 27), 7238),
 (datetime.date(2005, 9, 23), 7175),
 (datetime.date(2005, 7, 26), 7163),
 (datetime.date(2005, 5, 20), 7119),
 (datetime.date(2005, 8, 11), 7110),
 (datetime.date(2005, 9, 8), 7107),
 (datetime.date(2005, 9, 7), 7082)]

### Merging Pair RDDs

- very often you will come across situations where you have data present across two or more RDDs and you need to merge these two RDDs before you can summarize your data.
- this is not all that different from a join in SQL
- Spark allows you to merge two pair RDDs based on the keys of those RDDs.
- Let say you have two RDDs like this which have same keys but different values.
- you can merge these two pair RDDs by merging the records which have the same key and then collecting the values in those records into one list.
- such operations are called joins.

#### Types of joins

- join
- leftOuterJoin
- rightOuterJoin


##### Each of these operations is similar to its counterpart in SQL

- join: A join merges two pair RDDs and returns a new pair RDD. The new RDD will look at keys from both the RDDs and wherever the keys match it will collect the values. If there is no match for a key, which means that a key exists only in one of the input RDDs then the key is completely dropped.


- leftOuterJoin: All the records from the left RDD are returned which means that all the keys from the left RDD are returned and only those keys in the right RDD which have a match to the left RDD are returned.


- rightOuterJoin: The rightOuterJoin is just the reverse of the leftOuterJoin, All the keys from the right RDD are returned and only those keys in the left RDD that match the right RDD are returned.


- Above we were able to find the top 10 days which had the most amount of traffic passing by the Dodgers Stadium.


- It would be interesting to see--- if the Dodgers were playing a game in the stadium on any of these days


- To do that we have to join this dataset with another dataset

In [27]:
# joining with games

def parsedGames(row):
    DATE_FMT = "%m/%d/%y"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT).date()
    return (row[0], row[4])

In [28]:
gamesParsed = games.map(parsedGames)
gamesParsed.take(10)

[(datetime.date(2005, 4, 12), 'San Francisco'),
 (datetime.date(2005, 4, 13), 'San Francisco'),
 (datetime.date(2005, 4, 15), 'San Diego'),
 (datetime.date(2005, 4, 16), 'San Diego'),
 (datetime.date(2005, 4, 17), 'San Diego'),
 (datetime.date(2005, 4, 25), 'Arizona'),
 (datetime.date(2005, 4, 26), 'Arizona'),
 (datetime.date(2005, 4, 27), 'Arizona'),
 (datetime.date(2005, 4, 29), 'Colorado'),
 (datetime.date(2005, 4, 30), 'Colorado')]

In [29]:
dailyTrendCombined = dailyTrend.leftOuterJoin(gamesParsed)

In [30]:
dailyTrendCombined.take(10)

[(datetime.date(2005, 4, 10), (-288, None)),
 (datetime.date(2005, 4, 11), (5062, None)),
 (datetime.date(2005, 4, 14), (6423, None)),
 (datetime.date(2005, 4, 15), (6459, 'San Diego')),
 (datetime.date(2005, 4, 16), (6002, 'San Diego')),
 (datetime.date(2005, 4, 17), (5322, 'San Diego')),
 (datetime.date(2005, 4, 18), (5600, None)),
 (datetime.date(2005, 4, 19), (6049, None)),
 (datetime.date(2005, 4, 21), (5977, None)),
 (datetime.date(2005, 4, 22), (6038, None))]

- You will see that the key is still the date, 


- but the value is now a tuple which contains values from the first RDD and the second RDD, Wherever the second RDD did not have a match for the keys in the first RDD


- The reason we used the leftOuterJoin is we want to keep all the dates in our dataset whether or not there was a game on that day, if we had used a join directly then you would lose out on any dates which did not have a game.

In [31]:
def checkGameDay(row):
    if row[1][1]== None:
        return (row[0], row[1][1], "Regular Day", row[1][0])
    else:
        return (row[0], row[1][1], "Games Day", row[1][0])
    
dailyTrendbyGames = dailyTrendCombined.map(checkGameDay)

In [32]:
dailyTrendbyGames.take(10)

[(datetime.date(2005, 4, 10), None, 'Regular Day', -288),
 (datetime.date(2005, 4, 11), None, 'Regular Day', 5062),
 (datetime.date(2005, 4, 14), None, 'Regular Day', 6423),
 (datetime.date(2005, 4, 15), 'San Diego', 'Games Day', 6459),
 (datetime.date(2005, 4, 16), 'San Diego', 'Games Day', 6002),
 (datetime.date(2005, 4, 17), 'San Diego', 'Games Day', 5322),
 (datetime.date(2005, 4, 18), None, 'Regular Day', 5600),
 (datetime.date(2005, 4, 19), None, 'Regular Day', 6049),
 (datetime.date(2005, 4, 21), None, 'Regular Day', 5977),
 (datetime.date(2005, 4, 22), None, 'Regular Day', 6038)]

In [33]:
dailyTrendbyGames.sortBy(lambda x:-x[3]).take(10)

[(datetime.date(2005, 7, 28), 'Cincinnati', 'Games Day', 7661),
 (datetime.date(2005, 7, 29), 'St. Louis', 'Games Day', 7499),
 (datetime.date(2005, 8, 12), 'NY Mets', 'Games Day', 7287),
 (datetime.date(2005, 7, 27), 'Cincinnati', 'Games Day', 7238),
 (datetime.date(2005, 9, 23), 'Pittsburgh', 'Games Day', 7175),
 (datetime.date(2005, 7, 26), 'Cincinnati', 'Games Day', 7163),
 (datetime.date(2005, 5, 20), 'LA Angels', 'Games Day', 7119),
 (datetime.date(2005, 8, 11), 'Philadelphia', 'Games Day', 7110),
 (datetime.date(2005, 9, 8), None, 'Regular Day', 7107),
 (datetime.date(2005, 9, 7), 'San Francisco', 'Games Day', 7082)]

### CombineByKey

- The combineByKey operation is a more general way to combine records with the same key,
- like reduceByKey it is used to combine records which have the same key within an RDD,
- but you have much more granular control over how these records can be combined


- combineByKey requires 3 functions:
  - createCombiner Function: This initializes a value when a key is seen for the first time within a partition.
  
  - merge Function: The second function is the merge function and this specifies how values with the same key should be combined for records which are present within a single partition.
  
  - mergeCombiners Function: Specifies how the results from each partition should be combined

### Comparing average traffic on game days and non-game days

In [35]:
# Average on Game Day vs Non Game Day

dailyTrendbyGames.map(lambda x:(x[2], x[3]))\
                 .combineByKey(lambda value : (value,1),
                 lambda acc,value:(acc[0]+value,acc[1]+1),\
                 lambda acc1, acc2:(acc1[0]+acc2[0], acc1[1]+acc2[1]))\
                 .mapValues(lambda x:x[0]/x[1])\
                 .collect()

[('Regular Day', 5411.329787234043), ('Games Day', 5948.604938271605)]