# Lab 3

In this lab, we will practice Python's higher order functions, in particular, map(), filter() and reduce().


## Task 1

You are provided a list of service status updates scraped from an MTA information website. Each update may indicate <i>Good Service</i>, <i>Planned Work</i>, or <i>Delays</i> for one or more subway lines. Our first objective is to list all the lines that are running with <i>Delays</i>. To guide you through the process, we split the problem into smaller tasks.

In [1]:
from functools import reduce

In [2]:
# This is your input data, a list of subway line status.
# It is a list of string in a specific format

status = [
    '1,2,3 : Good Service',
    '4,5,6 : Delays',
    '7 : Good Service',
    'A,C : Good Service',
    'E : Planned Work',
    'G : Delays',
    'B,D,F,M : Good Service',
    'J,Z : Delays',
    'L : Good Service',
    'N,Q,R : Planned Work',
    'S : Good Service',
]

### Sub-Task 1

Please complete the lambda expression to filter only the status updates for the lines that run with <i>Delays</i>.

In [7]:
delayUpdates = list(filter(lambda x: 'Delays' in x, status))
delayUpdates

['4,5,6 : Delays', 'G : Delays', 'J,Z : Delays']

### Sub-Task 2

Please complete the lambda expression below to convert each status line into a list of subway lines, i.e. <b><i>'4,5,6 : Delays'</i></b> would become <b><i>['4','5','6']</i></b>

In [9]:
delayLineList = list(map(lambda x: x.split(' :')[0].split(','), delayUpdates))
delayLineList

[['4', '5', '6'], ['G'], ['J', 'Z']]

### Sub-Task 3

Please complete the reduce command below to convert each the list of subway lists given in <i>delayLineList</i> into a single list of subway lines running with delay.

In [19]:
delayLines = reduce(lambda x,y: x+y, delayLineList)
#delayLines = reduce(lambda x,y: x+y, delayLineList, ['X']) # X can be identified as the initial
delayLines

['4', '5', '6', 'G', 'J', 'Z']

### Sub-Task 4

Please complete the reduce command below to count the number of lines in <b>delayLines</b>.

In [22]:
delayLineCnt = reduce(lambda x,y: x+1,
                    delayLines, 0)
delayLineCnt

6

## Task 2

In this excercise, we would like to expand the combined service updates into separate updates for each subway line. For example, instead of having a single line <b>'1,2,3 : Good Service'</b> to indicate that line 1, 2, and 3 are in good service, we would like to convert that into 3 separate updates: <b>'1 : Good Service'</b>, <b>'2 : Good Service'</b>, and <b>'3 : Good Service'</b>.

You are tasked to write a chain of map(), filter(), and/or reduce() to convert the <b>status</b> variable into a list like below:

Please note that you may only use higher order functions without access to global variables. Your expression should contain only map(), filter() and/or reduce() and your custom function definitions.

In [23]:
status[:2]

['1,2,3 : Good Service', '4,5,6 : Delays']

In [38]:
#method 1
def mapper(x):
    s = x.split(' : ')
    lines = s[0].split(',')
    status = s[1]
    #print(lines, status)
    return [' : '.join([line, status]) for line in lines]


#updates = list(map(mapper,status))
updates = list(reduce(lambda x,y:x+y, map(mapper,status), []))

# The expected value of updates is the list shown above
updates

['1 : Good Service',
 '2 : Good Service',
 '3 : Good Service',
 '4 : Delays',
 '5 : Delays',
 '6 : Delays',
 '7 : Good Service',
 'A : Good Service',
 'C : Good Service',
 'E : Planned Work',
 'G : Delays',
 'B : Good Service',
 'D : Good Service',
 'F : Good Service',
 'M : Good Service',
 'J : Delays',
 'Z : Delays',
 'L : Good Service',
 'N : Planned Work',
 'Q : Planned Work',
 'R : Planned Work',
 'S : Good Service']

In [51]:
#method 2
def mapper(x):
    s = x.split(' : ')
    lines = s[0].split(',')
    status = s[1]
    #print(lines, status)
    return dict([(line, status) for line in lines])


updates = list(map(mapper,status))
updates = reduce(lambda x,y:{**x,**y}, map(mapper,status))

# The expected value of updates is the list shown above
updates

{'1': 'Good Service',
 '2': 'Good Service',
 '3': 'Good Service',
 '4': 'Delays',
 '5': 'Delays',
 '6': 'Delays',
 '7': 'Good Service',
 'A': 'Good Service',
 'C': 'Good Service',
 'E': 'Planned Work',
 'G': 'Delays',
 'B': 'Good Service',
 'D': 'Good Service',
 'F': 'Good Service',
 'M': 'Good Service',
 'J': 'Delays',
 'Z': 'Delays',
 'L': 'Good Service',
 'N': 'Planned Work',
 'Q': 'Planned Work',
 'R': 'Planned Work',
 'S': 'Good Service'}

## Task 3

We would like to write an HOF expression to count the total number of trip activities involved each station. For example, if a rider starts a trip at station A and ends at station B, each station A and B will receive +1 count for  the trip. The output must be tuples, each consisting of a station name and a total count. A portion of the expected output are included below.

In [48]:
import csv


class MRTask3(MRJob):
    def mapper1(self, _, data):
        yield data['start_station_name'], 1
        yield data['end_station_name'], 1
    
    def reducer1(self, station_name, ones):
        yield station_name, sum(ones)

    def steps(self):
        return[
            MRStep(mapper=self.mapper1,
                  reducer=self.reducer1)
        ]

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output1 = list(mr.runJob(enumerate(reader),
                            MRTask3(args=[])))

output1[:10]

[('1 Ave & E 15 St', 795),
 ('1 Ave & E 44 St', 219),
 ('10 Ave & W 28 St', 422),
 ('11 Ave & W 27 St', 354),
 ('11 Ave & W 41 St', 461),
 ('11 Ave & W 59 St', 242),
 ('12 Ave & W 40 St', 217),
 ('2 Ave & E 31 St', 588),
 ('2 Ave & E 58 St', 125),
 ('3 Ave & Schermerhorn St', 34)]

## Task 4

We would like to count the number of trips taken between pairs of stations. Trips taken from station A to station B or  from station B to station A are both counted towards the station pair A and B. *Please note that the station pair should be identified by station names, as a tuple, and **in lexical order**, i.e. **(A,B)** instead of ~~(B,A)~~ in this case*. The output must be tuples, each consisting of the station pair identification and a count. A portion of the expected output are included below. Please provide your HOF expression.

In [50]:
class MRTask4(MRJob):
    def mapper1(self, _, data):
        if data['start_station_name']<data['end_station_name']:
            yield (data['start_station_name'], data['end_station_name']), 1
        else:
            yield (data['end_station_name'], data['start_station_name']), 1    
    
    def reducer1(self, pair, ones):
        yield pair, sum(ones)
        
    def steps(self):
        return[
            MRStep(mapper=self.mapper1,
                  reducer=self.reducer1),
        ]


with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output3 = list(mr.runJob(enumerate(reader),
                            MRTask4(args=[])))

output3[:10]

[(('1 Ave & E 15 St', '1 Ave & E 15 St'), 5),
 (('1 Ave & E 15 St', '1 Ave & E 44 St'), 6),
 (('1 Ave & E 15 St', '11 Ave & W 27 St'), 1),
 (('1 Ave & E 15 St', '2 Ave & E 31 St'), 9),
 (('1 Ave & E 15 St', '5 Ave & E 29 St'), 2),
 (('1 Ave & E 15 St', '6 Ave & Broome St'), 3),
 (('1 Ave & E 15 St', '6 Ave & Canal St'), 1),
 (('1 Ave & E 15 St', '8 Ave & W 31 St'), 5),
 (('1 Ave & E 15 St', '9 Ave & W 14 St'), 3),
 (('1 Ave & E 15 St', '9 Ave & W 16 St'), 3)]

## Task 5

In this task, you are asked to compute the station with the most riders started from, per each gender of the *'Subscriber'* user. Meaning, what was the station name with the highest number of bike pickups for female riders, for male riders and for unknown riders.

The output will be a list of tuples, each includes a gender label (as indicated below) and another tuple consisting of a station name, and the total number of trips started at that station for that gender. The expected output are included below. Please provide your HOF expression below.

The label mapping for the gender column in citibike.csv is: (Zero=**Unknown**; 1=**Male**; 2=**Female**)

In [56]:
class MRTask5(MRJob):
    def mapper1(self, _, data):
        if data['usertype']=='Subscriber':
            yield (data['gender'], data['start_station_name']), 1
    
    def reducer1(self, gender_station, ones):
        yield gender_station, sum(ones)
    
    def mapper2(self, gender_station, count):
        if gender_station[0]=='0':
            yield 'Unknown', (gender_station[1], count)
        elif gender_station[0]=='1':
            yield 'Male', (gender_station[1], count)
        else:
            yield 'Female', (gender_station[1], count)
        
    def reducer2(self, gender, values):
        yield gender, max(values, key=lambda x: x[1])
        
    def steps(self):
        return[
            MRStep(mapper=self.mapper1,
                  reducer=self.reducer1),
            MRStep(mapper=self.mapper2,
                  reducer=self.reducer2),
        ]


with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output5 = list(mr.runJob(enumerate(reader),
                            MRTask5(args=[])))

output5

[('Female', ('W 21 St & 6 Ave', 107)),
 ('Male', ('8 Ave & W 31 St', 488)),
 ('Unknown', ('Catherine St & Monroe St', 1))]

In [7]:
#reload changes
import importlib
import mr_word_count
importlib.reload(mr_word_count)

#import self-defined function file
from mr_word_count import MRWordFrequencyCount
job = MRWordFrequencyCount(args=['book.txt'])

In [8]:
with job.make_runner() as runner:
    runner.run()
    for line in runner.cat_output():
        print(line)

No configs specified for inline runner


b'"words"\t38538\n'
b''
b'"lines"\t5877\n'
b''
b'"chars"\t246432\n'


In [9]:
!python mr_word_count.py book.txt 2>/dev/null

"words"	38538
"lines"	5877
"chars"	246432


In [10]:
from mrjob.job import MRJob
import mapreduce as mr


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            yield word.lower(), 1
        #yield "words", len(line.split())

    def reducer(self, key, values):
        yield key, sum(values)

#job = MRWordFrequencyCount(args=['book.txt'])
job = MRWordFrequencyCount(args=[])

#list(mr.runJob(enumerate(open('book.txt','r')),job))
counts = list(mr.runJob(enumerate(open('book.txt','r',encoding='utf8')),
                        job))
counts[:10]

[('"', 145),
 ('"defects,"', 1),
 ('"information', 1),
 ('"plain', 2),
 ('"project', 5),
 ('"right', 1),
 ('#51302]', 1),
 ('$5,000)', 1),
 ('&', 3),
 ("'as-is',", 1)]

In [6]:
#e.g. for enumerate
print(list(range(10,20)))
print(list(enumerate(range(10,20))))

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[(0, 10), (1, 11), (2, 12), (3, 13), (4, 14), (5, 15), (6, 16), (7, 17), (8, 18), (9, 19)]


In [27]:
from mrjob.job import MRJob
import mapreduce as mr


class MRWordMostCount(MRJob):

    def mapper(self, word, count):
        #send all the value to a None key, so as to look everything as a whole
        yield None, (word, count)
        
    def reducer(self, key, values):
        yield (key, max(values, key=lambda x: x[1]))
        

MostCount = list(mr.runJob(counts, MRWordMostCount(args=[])))
MostCount

[(None, ('the', 2427))]

In [24]:
from mrjob.job import MRJob
import mapreduce as mr


class MRWordMostCount(MRJob):

    def mapper(self, word, count):
        #send all the value to a None key, so as to look everything as a whole
        yield None, (word, count)
        
    def combiner(self, key, values):
        yield (key, max(values, key=lambda x: x[1]))
        
    def reducer(self, key, values):
        yield (key, max(values, key=lambda x: x[1]))


MostCount = list(mr.runJob(counts, MRWordMostCount(args=[])))
MostCount

[(None, ('the', 2427))]

In [30]:
#Multi-steps MR
from mrjob.job import MRJob, MRStep
import mapreduce as mr


class MRWordFrequencyCount(MRJob):

    def mapper1(self, _, line):
        for word in line.split():
            yield word.lower(), 1
        
    def reducer1(self, key, values):
        yield (key, sum(values))
    
    def mapper2(self, word, count):
        yield None, (word, count)
    
    #use combiner to prevent the memory exploding
    def combiner2(self, key, values):
        yield (key, max(values, key=lambda x: x[1]))
        
    def reducer2(self, key, values):
        yield (key, max(values, key=lambda x: x[1]))
    
    def steps(self):
        return[
            MRStep(mapper=self.mapper1,
                  reducer=self.reducer1),
            MRStep(mapper=self.mapper2,
                   combiner = self.combiner2,
                  reducer=self.reducer2),
        ]


count1 = list(mr.runJob(enumerate(open('book.txt','r',encoding='utf8')),
                        MRWordFrequencyCount(args=[])))
count1

[(None, ('the', 2427))]