# Lab 3 - MapReduce

In this lab, we practice the MapReduce programming paradigm.

We will complete the tasks using the accompanied *mapreduce* package (as **mapreduce.py**) and MRJob. Please download the **mapreduce.py** file from our online class resource page, and place it in the same folder with your notebook.

For each invocation of an MapReduce job (with mr.run()), you are expected to supply a mapper, a reducer and/or a combiner as needed. Below are sample usage of the package:

```python
    # Run on input1 using your mapper1 and reducer1 function
    output = list(mr.run(input1, mapper1, reducer1))

    # Run on input2 using only your mapper2, no reduce phase
    output = list(mr.run(enumerate(input2), mapper2, combiner2))
    
    # Run on input3 using 2 nested MapReduce jobs
    output = mr.run(mr.run(input3, mapper3, reducer3), mapper4)
```
    
Please note that the input must be an iteratable of **key/value pairs**. If your inpu tdata does not have a key, you can simply add a null or index key through **enumerator(input)**. The output of the mr.run() is always a **generator**. You have to cast it to a list if you'd like to view, index or print it out.

We will also need **book.txt** and **citibike.csv** to be downloaded.

In [None]:
!pip install mrjob

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l[K     |▊                               | 10 kB 20.6 MB/s eta 0:00:01[K     |█▌                              | 20 kB 8.8 MB/s eta 0:00:01[K     |██▎                             | 30 kB 7.4 MB/s eta 0:00:01[K     |███                             | 40 kB 7.1 MB/s eta 0:00:01[K     |███▊                            | 51 kB 4.2 MB/s eta 0:00:01[K     |████▌                           | 61 kB 4.4 MB/s eta 0:00:01[K     |█████▏                          | 71 kB 4.3 MB/s eta 0:00:01[K     |██████                          | 81 kB 4.9 MB/s eta 0:00:01[K     |██████▊                         | 92 kB 3.8 MB/s eta 0:00:01[K     |███████▌                        | 102 kB 4.1 MB/s eta 0:00:01[K     |████████▏                       | 112 kB 4.1 MB/s eta 0:00:01[K     |█████████                       | 122 kB 4.1 MB/s eta 0:00:01[K     |█████████▊                      | 133 kB 4.1 MB/s eta 0:00:01[K  

In [None]:
!gdown --id 1sq4-zXn2Z82mdLSBBegEgsUsfqtgza-C -O mapreduce.py
!gdown --id 1qCQ6edyhTA1kqFWZf1y65ogidivDbBIT -O book.txt
!gdown --id 1I8eqA1Zy3vFq4mN8z0ZRl7ABXrdzCRYI -O citibike.csv

Downloading...
From: https://drive.google.com/uc?id=1sq4-zXn2Z82mdLSBBegEgsUsfqtgza-C
To: /content/mapreduce.py
100% 2.66k/2.66k [00:00<00:00, 5.10MB/s]
Downloading...
From: https://drive.google.com/uc?id=1qCQ6edyhTA1kqFWZf1y65ogidivDbBIT
To: /content/book.txt
100% 259k/259k [00:00<00:00, 74.5MB/s]
Downloading...
From: https://drive.google.com/uc?id=1I8eqA1Zy3vFq4mN8z0ZRl7ABXrdzCRYI
To: /content/citibike.csv
100% 8.16M/8.16M [00:00<00:00, 290MB/s]


In [None]:
import csv
import mapreduce as mr

## Task 0

Here is another concrete example on "Word Count" using the package. Assuming we have a text file named *book.txt*. Our task is to count the frequency of words in this document, and print the top 10. For illustration purposes, we use only the first 1000 lines of the book for counting.

In [None]:
with open('book.txt', 'r') as fi:
    lines = [(i,line.strip()) for i,line in enumerate(fi) if i<1000]
lines[:10]

[(0, '\ufeffThe Project Gutenberg EBook of English Coins and Tokens, by'),
 (1, 'Llewellynn Jewitt and Barclay V. Head'),
 (2, ''),
 (3,
  'This eBook is for the use of anyone anywhere in the United States and most'),
 (4, 'other parts of the world at no cost and with almost no restrictions'),
 (5,
  'whatsoever.  You may copy it, give it away or re-use it under the terms of'),
 (6, 'the Project Gutenberg License included with this eBook or online at'),
 (7,
  "www.gutenberg.org.  If you are not located in the United States, you'll have"),
 (8,
  'to check the laws of the country where you are located before using this ebook.'),
 (9, '')]

In [None]:
### After this, 'lines' stores a list of 1000 text lines
def mapper(k1, line):
    for word in line.strip().split(' '):
        if len(word)>0:
            yield (word, 1)
    
def reducer(word, counts):
    yield (word, sum(counts))

wCounts = list(mr.run(lines, mapper, reducer))
sortedCounts = sorted(wCounts, key=lambda x: -x[1])
sortedCounts[:10]

[('the', 360),
 ('of', 326),
 ('and', 246),
 ('a', 169),
 ('or', 161),
 ('to', 101),
 ('with', 100),
 ('in', 88),
 ('on', 67),
 ('as', 56)]

In [None]:
!head -n 2 citibike.csv

cartodb_id,the_geom,tripduration,starttime,stoptime,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bikeid,usertype,birth_year,gender
1,,801,2015-02-01 00:00:00+00,2015-02-01 00:14:00+00,521,8 Ave & W 31 St,40.75044999,-73.99481051,423,W 54 St & 9 Ave,40.76584941,-73.98690506,17131,Subscriber,1978,2


## Task 1

We would like to write a MapReduce job to count the total number of trips involved at each station. For example, if a trip starts at station A and stops at station B, the trip will count for both A and B. The output must be tuples, each consisting of a station name and a count.

In [None]:
def mapper1(_, row):
  yield (row['start_station_name'], 1)
  yield (row['end_station_name'], 1)

def reducer1(station, counts):
  yield (station, sum(counts))
    
with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output1 = list(mr.run(reader, mapper1, reducer1))

output1[:10]

[('1 Ave & E 15 St', 795),
 ('1 Ave & E 44 St', 219),
 ('10 Ave & W 28 St', 422),
 ('11 Ave & W 27 St', 354),
 ('11 Ave & W 41 St', 461),
 ('11 Ave & W 59 St', 242),
 ('12 Ave & W 40 St', 217),
 ('2 Ave & E 31 St', 588),
 ('2 Ave & E 58 St', 125),
 ('3 Ave & Schermerhorn St', 34)]


## Task 2

Below is an example of showing how to use nested jobs and jobs with mappers only using the mapreduce package, thus, no points are included. Our task here is that we would like to filter the output of Task 1 to display only those stations with more than 1000 trips involved, of course, using the MapReduce paradigm.

In [None]:
def mapper2(station, count):
  if count > 1000:  yield(station, count)

with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output2 = list(mr.run(mr.run(reader, mapper1, reducer1), mapper2))

output2

[('8 Ave & W 31 St', 1065),
 ('E 43 St & Vanderbilt Ave', 1003),
 ('Lafayette St & E 8 St', 1013),
 ('W 21 St & 6 Ave', 1057),
 ('W 41 St & 8 Ave', 1095)]


## Task 3

We would like to count the number of trips taken between pairs of stations. Trips taken from station A to station B or  from station B to station A are both counted towards the station pair A and B. Please note that the station pair shoud be identified by station names, as a tuple, and in lexical order, i.e. (A,B) instead of (B,A) in this case. The output must be tuples, each consisting of the station pair identification and a count.

In [None]:
def mapper3(_, row):
  if row['start_station_name'] < row['end_station_name']: yield ((row['start_station_name'], row['end_station_name']), 1)
  else: yield ((row['end_station_name'], row['start_station_name']), 1)

def reducer3(station_pair, counts):
  yield (station_pair, sum(counts))

with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output3 = list(mr.run(reader, mapper3, reducer3))

output3[:10]

[(('1 Ave & E 15 St', '1 Ave & E 15 St'), 5),
 (('1 Ave & E 15 St', '1 Ave & E 44 St'), 6),
 (('1 Ave & E 15 St', '11 Ave & W 27 St'), 1),
 (('1 Ave & E 15 St', '2 Ave & E 31 St'), 9),
 (('1 Ave & E 15 St', '5 Ave & E 29 St'), 2),
 (('1 Ave & E 15 St', '6 Ave & Broome St'), 3),
 (('1 Ave & E 15 St', '6 Ave & Canal St'), 1),
 (('1 Ave & E 15 St', '8 Ave & W 31 St'), 5),
 (('1 Ave & E 15 St', '9 Ave & W 14 St'), 3),
 (('1 Ave & E 15 St', '9 Ave & W 16 St'), 3)]


## Task 4

In this task, you are asked to compute the station with the most riders started from, per each gender of the *'Subscriber'* user. Meaning, what was the station name with the highest number of bike pickups for female riders, for male riders and for unknown riders.

The output will be a list of tuples, each includes a gender label (as indicated below) and another tuple consisting of a station name, and the total number of trips started at that station for that gender.


The label mapping for the gender column in citibike.csv is: (Zero=<b>Unknown</b>; 1=<b>Male</b>; 2=<b>Female</b>)

In [None]:
def mapper4(station_gender, count):
  yield (station_gender[1], (station_gender[0], count))

def reducer4(gender, station_count):
  if gender == '0':
    yield ('Unknown', max(station_count))
  elif gender == '1':
    yield ('Male', max(station_count))
  else:
    yield ('Female', max(station_count))

def mapper5(_, row):
  if row['usertype'] == 'Subscriber':
    yield ((row['start_station_name'], row['gender']), 1)

def reducer5(station_gender_pair, counts):
    yield (station_gender_pair, sum(counts))

with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output5 = list(mr.run(mr.run(reader, mapper5, reducer5), mapper4, reducer4))

output5[:10]

[('Unknown', ('Stanton St & Mangin St', 1)),
 ('Male', ('York St & Jay St', 81)),
 ('Female', ('York St & Jay St', 25))]

## Task 5

MRJob is a convenient package for simplifying the execution of MapReduce jobs on clusters. However, it doesn't work in a notebook. We're going to convert some of the examples of MRJob into our notebooks so that we can test our code before deploying them on Hadoop.

The two examples are available at:

https://mrjob.readthedocs.io/en/latest/guides/quickstart.html

https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html

## Task 6

Let's try to run the above MRJob examples as stand-alone applications. Please check again:
https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html#defining-steps