# Homework 4 - MapReduce

In this homework, we are practicing the MapReduce programming paradigm. 

You are required to turn in this notebook as BDM\_HW4\_<b>CCNY_ID</b>.ipynb. You will be asked to complete each task using the accompanied <i>mapreduce</i> package (as <b>mapreduce.py</b>) using one or more MapReduce "jobs". For each such job (with mr.run()), you are expected to supply a mapper and a reducer as needed. Below are sample usage of the package:

```python
    # Run on input1 using your mapper1 and reducer1 function
    output = list(mr.run(input1, mapper1, reducer1))

    # Run on input2 using only your mapper2, no reduce phase
    output = list(mr.run(input2, mapper2))
    
    # Run on input3 using 2 nested MapReduce jobs
    output = list(mr.run(mr.run(input3, mapper3, reducer3), mapper4, reducer4))
```
    
Please note that the output of the mr.run() is always a <b>generator</b>. You have to cast it to a list if you'd like to view, index or print it out.

We will be using only the citibike data (<b>citibike.csv</b>) for this homework.

In [1]:
import csv
import mapreduce as mr



## Task 1 (2 points)

We would like to write a MapReduce job to count the total number of trips involved at each station. For example, if a trip starts at station A and stops at station B, the trip will count for both A and B. You are asked to fill in the <b>mapper1</b> and <b>reducer1</b> code block. The output must be tuples, each consisting of a station name and a count. A portion of the expected output are included below.

In [2]:
def mapper1(row):
    yield (row['start_station_name'], 1)
    yield (row['end_station_name'], 1)

def reducer1(k2v2):
    (word, count) = k2v2
    return (word, len(count))

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output1 = list(mr.run(reader, mapper1, reducer1))

output1[:10]

[('1 Ave & E 15 St', 795),
 ('1 Ave & E 44 St', 219),
 ('10 Ave & W 28 St', 422),
 ('11 Ave & W 27 St', 354),
 ('11 Ave & W 41 St', 461),
 ('11 Ave & W 59 St', 242),
 ('12 Ave & W 40 St', 217),
 ('2 Ave & E 31 St', 588),
 ('2 Ave & E 58 St', 125),
 ('3 Ave & Schermerhorn St', 34)]


## Task 2 (0 points)

Below is an example of showing how to use nested jobs and jobs with mappers only using the mapreduce package, thus, no points are included. Our task here is that we would like to filter the output of Task 1 to display only those stations with more than 1000 trips involved, of course, using the MapReduce paradigm.

In [3]:
def mapper2((station,count)):
    if count>1000:
        yield (station,count)

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output2 = list(mr.run(mr.run(reader, mapper1, reducer1), mapper2))

output2

[('8 Ave & W 31 St', 1065),
 ('E 43 St & Vanderbilt Ave', 1003),
 ('Lafayette St & E 8 St', 1013),
 ('W 21 St & 6 Ave', 1057),
 ('W 41 St & 8 Ave', 1095)]


## Task 3 (2 points)

We would like to count the number of trips taken between pairs of stations. Trips taken from station A to station B or  from station B to station A are both counted towards the station pair A and B. Please note that the station pair shoud be identified by station names, as a tuple, and in lexical order, i.e. (A,B) instead of (B,A) in this case. The output must be tuples, each consisting of the station pair identification and a count. A portion of the expected output are included below. You are asked to fill in the <b>mapper3</b> and <b>reducer3</b> code block. 

In [4]:
def mapper3(row):
    start_station = row['start_station_name']
    end_station = row['end_station_name']
    yield (sorted((start_station, end_station)), 1)

def reducer3(k2v2):
    (name, count) = k2v2
    return (name, len(count))

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output3 = list(mr.run(reader, mapper3, reducer3))

output3[:10]

[(['1 Ave & E 15 St', '1 Ave & E 15 St'], 5),
 (['1 Ave & E 15 St', '1 Ave & E 44 St'], 6),
 (['1 Ave & E 15 St', '11 Ave & W 27 St'], 1),
 (['1 Ave & E 15 St', '2 Ave & E 31 St'], 9),
 (['1 Ave & E 15 St', '5 Ave & E 29 St'], 2),
 (['1 Ave & E 15 St', '6 Ave & Broome St'], 3),
 (['1 Ave & E 15 St', '6 Ave & Canal St'], 1),
 (['1 Ave & E 15 St', '8 Ave & W 31 St'], 5),
 (['1 Ave & E 15 St', '9 Ave & W 14 St'], 3),
 (['1 Ave & E 15 St', '9 Ave & W 16 St'], 3)]


## Task 4 (2 points)

Next, we would like to futher process the output from Task 3 to determine the station popularity among all of the station pairs that have 35 or more trips. The popularity of station is calculated as how many times it appears in the station pair lists for the top pairs. In other words, we would like to first filter the station pairs to only those that have 35 or more trips. Then, among these pairs, we count how many time each station appears and report back these counts. The output will be tuples, each consisting of the station name and a count. The expected output are included below. As illustrated, <i>W 41 St & 8 Ave</i> station is the most "popular" with 4 appearances. You are asked to fill in the <b>mapper4</b> and <b>reducer4</b> code block. 

In [5]:
def mapper4((station_pair, count)):
    if count >= 35:
        yield (station_pair[0], 1)
        yield (station_pair[1], 1)
    
def reducer4(k2v2):
    (name, count) = k2v2
    return (name, len(count))

output4 = list(mr.run(output3, mapper4, reducer4))
output4

[('10 Ave & W 28 St', 1),
 ('11 Ave & W 27 St', 2),
 ('11 Ave & W 41 St', 1),
 ('8 Ave & W 31 St', 3),
 ('8 Ave & W 33 St', 1),
 ('9 Ave & W 22 St', 1),
 ('Adelphi St & Myrtle Ave', 1),
 ('DeKalb Ave & Hudson Ave', 1),
 ('E 10 St & Avenue A', 1),
 ('E 24 St & Park Ave S', 2),
 ('E 27 St & 1 Ave', 1),
 ('E 32 St & Park Ave', 1),
 ('E 33 St & 2 Ave', 2),
 ('E 43 St & Vanderbilt Ave', 2),
 ('E 47 St & Park Ave', 1),
 ('E 6 St & Avenue B', 1),
 ('E 7 St & Avenue A', 1),
 ('Lafayette St & E 8 St', 3),
 ('Pershing Square North', 1),
 ('Pershing Square South', 2),
 ('Vesey Pl & River Terrace', 1),
 ('W 17 St & 8 Ave', 1),
 ('W 20 St & 11 Ave', 2),
 ('W 21 St & 6 Ave', 1),
 ('W 26 St & 8 Ave', 1),
 ('W 31 St & 7 Ave', 2),
 ('W 33 St & 7 Ave', 2),
 ('W 41 St & 8 Ave', 4),
 ('West Thames St', 1)]


## Task 5 (0 points)

This is another example of showing how to use nested jobs and to put everything together, so no points are given. Basically, we can run Task 3 and Task 4 combined in just a single call.

In [6]:
with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output5 = list(mr.run(mr.run(reader, mapper3, reducer3), mapper4, reducer4))
output5

[('10 Ave & W 28 St', 1),
 ('11 Ave & W 27 St', 2),
 ('11 Ave & W 41 St', 1),
 ('8 Ave & W 31 St', 3),
 ('8 Ave & W 33 St', 1),
 ('9 Ave & W 22 St', 1),
 ('Adelphi St & Myrtle Ave', 1),
 ('DeKalb Ave & Hudson Ave', 1),
 ('E 10 St & Avenue A', 1),
 ('E 24 St & Park Ave S', 2),
 ('E 27 St & 1 Ave', 1),
 ('E 32 St & Park Ave', 1),
 ('E 33 St & 2 Ave', 2),
 ('E 43 St & Vanderbilt Ave', 2),
 ('E 47 St & Park Ave', 1),
 ('E 6 St & Avenue B', 1),
 ('E 7 St & Avenue A', 1),
 ('Lafayette St & E 8 St', 3),
 ('Pershing Square North', 1),
 ('Pershing Square South', 2),
 ('Vesey Pl & River Terrace', 1),
 ('W 17 St & 8 Ave', 1),
 ('W 20 St & 11 Ave', 2),
 ('W 21 St & 6 Ave', 1),
 ('W 26 St & 8 Ave', 1),
 ('W 31 St & 7 Ave', 2),
 ('W 33 St & 7 Ave', 2),
 ('W 41 St & 8 Ave', 4),
 ('West Thames St', 1)]


## Task 6 (4 points)

In this task, you are asked to compute the station that riders started the most at for each gender. In particular, what was the station name with the highest number of bike pickups for female riders, male riders and unknown riders.

The output will be a list of tuples, each includes a gender label (as indicated below) and another tuple consisting of a station name, and the total number of trips started at that station for that gender. The expected output are included below. You are asked to fill in the code block with a series of MapReduce jobs, using your own mapper and reducer functions.

The label mapping for the gender column in citibike.csv is: (Zero=<b>Unknown</b>; 1=<b>Male</b>; 2=<b>Female</b>)

In [56]:
def mapper5(row):
    if row['gender']=='0':
        yield (('Unknown', row['start_station_name']), 1)
    elif row['gender']=='1':
        yield (('Male', row['start_station_name']), 1)
    else:
        yield (('Female', row['start_station_name']), 1)
    
def reducer5(k2v2):
    (name, count) = k2v2
    return (name, len(count))

def mapper6((k3,v3)):
    yield (k3[0], (k3[1], v3))
    
def reducer6(k2v2):
    (gender, stations) = k2v2
    return (gender, max(stations, key=lambda x:x[1]))

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output5 = list(mr.run(mr.run(reader, mapper5, reducer5), mapper6, reducer6))
output5


[('Female', ('W 21 St & 6 Ave', 107)),
 ('Male', ('8 Ave & W 31 St', 488)),
 ('Unknown', ('Central Park S & 6 Ave', 32))]