# Lab 3

In this lab, we will practice Python's higher order functions, in particular, map(), filter() and reduce().


## Task 1

You are provided a list of service status updates scraped from an MTA information website. Each update may indicate <i>Good Service</i>, <i>Planned Work</i>, or <i>Delays</i> for one or more subway lines. Our first objective is to list all the lines that are running with <i>Delays</i>. To guide you through the process, we split the problem into smaller tasks.

In [1]:
from functools import reduce
import csv

# MapReduce
from mrjob.job import MRJob

In [2]:
# This is your input data, a list of subway line status.
# It is a list of string in a specific format

status = [
    '1,2,3 : Good Service',
    '4,5,6 : Delays',
    '7 : Good Service',
    'A,C : Good Service',
    'E : Planned Work',
    'G : Delays',
    'B,D,F,M : Good Service',
    'J,Z : Delays',
    'L : Good Service',
    'N,Q,R : Planned Work',
    'S : Good Service',
]

### Sub-Task 1

Please complete the lambda expression to filter only the status updates for the lines that run with <i>Delays</i>.

In [3]:
delayUpdates = list(filter(lambda x: 'Delays' in x, status))
delayUpdates
# After this, your delayUpdates should be
# ['4,5,6 : Delays', 'G : Delays', 'J,Z : Delays']

['4,5,6 : Delays', 'G : Delays', 'J,Z : Delays']

### Sub-Task 2

Please complete the lambda expression below to convert each status line into a list of subway lines, i.e. <b><i>'4,5,6 : Delays'</i></b> would become <b><i>['4','5','6']</i></b>

In [4]:
delayLineList = list(map(lambda x: x.split(':')[0].split(','), delayUpdates))
delayLineList
# After this, your delayLineList should be
# [['4', '5', '6'], ['G'], ['J', 'Z']]

[['4', '5', '6 '], ['G '], ['J', 'Z ']]

### Sub-Task 3

Please complete the reduce command below to convert each the list of subway lists given in <i>delayLineList</i> into a single list of subway lines running with delay.

In [5]:
delayLines = reduce(lambda x, y: x+y, delayLineList)
delayLines
# After this, your delayLines should be
# ['4', '5', '6', 'G', 'J', 'Z']

['4', '5', '6 ', 'G ', 'J', 'Z ']

### Sub-Task 4

Please complete the reduce command below to count the number of lines in <b>delayLines</b>.

In [6]:
delayLineCount = reduce(lambda x,y: x+1,
                        delayLines, 0)
delayLineCount
# After this, your delayLineCount should be
# 6

6

## Task 2

In this excercise, we would like to expand the combined service updatse into separate updates for each subway line. For example, instead of having a single line <b>'1,2,3 : Good Service'</b> to indicate that line 1, 2, and 3 are in good service, we would like to convert that into 3 separate updates: <b>'1 : Good Service'</b>, <b>'2 : Good Service'</b>, and <b>'3 : Good Service'</b>.

You are tasked to write a chain of map(), filter(), and/or reduce() to convert the <b>status</b> variable into a list like below:

Please note that you may only use higher order functions without access to global variables. Your expression should contain only map(), filter() and/or reduce() and your custom function definitions.

In [7]:
status[:2]

['1,2,3 : Good Service', '4,5,6 : Delays']

In [8]:
# <ANY FUNCTION TO BE USED IN YOUR HOF>
def list_values(x):
    tup_split = x.split(' : ')
    lines_list = tup_split[0].split(',')
    status = tup_split[1]
    values_list = [key + ' : ' + status for key in lines_list]
    return values_list

updates = reduce(lambda x, y: x+y, map(list_values, status), [])

# The expected value of updates is the list shown above
updates

['1 : Good Service',
 '2 : Good Service',
 '3 : Good Service',
 '4 : Delays',
 '5 : Delays',
 '6 : Delays',
 '7 : Good Service',
 'A : Good Service',
 'C : Good Service',
 'E : Planned Work',
 'G : Delays',
 'B : Good Service',
 'D : Good Service',
 'F : Good Service',
 'M : Good Service',
 'J : Delays',
 'Z : Delays',
 'L : Good Service',
 'N : Planned Work',
 'Q : Planned Work',
 'R : Planned Work',
 'S : Good Service']

## Task 3

We would like to write an HOF expression to count the total number of trip activities involved each station. For example, if a rider starts a trip at station A and ends at station B, each station A and B will receive +1 count for  the trip. The output must be tuples, each consisting of a station name and a total count. A portion of the expected output are included below.

In [13]:
file = '../Files/citibike.csv'

with open(file, 'r') as fi:
    reader = csv.DictReader(fi)
    output1 = reduce(lambda x, y: x+y, map(lambda x: [x['start_station_name'], x['end_station_name']], reader), [])
    output1 = [(station, output1.count(station)) for station in set(output1)]

output1[:10]

[('Elizabeth St & Hester St', 425),
 ('DeKalb Ave & Vanderbilt Ave', 93),
 ('Avenue D & E 12 St', 44),
 ('Bank St & Hudson St', 308),
 ('St James Pl & Oliver St', 131),
 ('Sullivan St & Washington Sq', 334),
 ('Willoughby St & Fleet St', 114),
 ('Fulton St & Waverly Ave', 113),
 ('E 55 St & 2 Ave', 156),
 ('9 Ave & W 18 St', 239)]

In [14]:
import mrjob as mr

class MRTask3(MRJob):
    
    def mapper(self, _,  v1):
        yield v1['start_station_name'], 1
        yield v1['end_station_name'], 1
    
    def reducer(self, k2, v2s):
        yield k2, sum(v2s)

with open(file, 'r') as fi:
    reader = csv.DictReader(fi)
    output1 = list(mr.runJob(enumerate(reader), MRTask3(args=[])))

AttributeError: module 'mrjob' has no attribute 'runJob'

In [11]:
output1[:10]

NameError: name 'output1' is not defined

## Task 4

We would like to count the number of trips taken between pairs of stations. Trips taken from station A to station B or  from station B to station A are both counted towards the station pair A and B. *Please note that the station pair should be identified by station names, as a tuple, and **in lexical order**, i.e. **(A,B)** instead of ~~(B,A)~~ in this case*. The output must be tuples, each consisting of the station pair identification and a count. A portion of the expected output are included below. Please provide your HOF expression.

In [10]:
# <YOUR HOF FUNCTIONS (if any)>
import csv
class MRTask3(MRJob):
    
    def mapper(self, k1,  v1):
        if v1['start_station_name'] < v1['end_station_name']:
            yield ((v1['start_station_name'] ,v1['end_station_name']),1)
        elseL
            yield ((v1['end_station_name'],v1['start_station_name']),1)
    
    def reducer(self, k2, v2s):
        yield k2, sum(v2s)
        
with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output3 = reduce(lambda x, y: x+y, map(lambda x: [(x['start_station_name'], x['end_station_name'])], reader), [])
    output3 = [(station, output3.count(station)) for station in set(output3)]

output3[:10]

[(('Broadway & E 14 St', 'W 20 St & 7 Ave'), 2),
 (('Perry St & Bleecker St', 'W Broadway & Spring St'), 1),
 (('Carmine St & 6 Ave', 'W 15 St & 7 Ave'), 1),
 (('Bialystoker Pl & Delancey St', 'William St & Pine St'), 1),
 (('Mott St & Prince St', 'Greenwich St & N Moore St'), 1),
 (('Broadway & W 29 St', '8 Ave & W 31 St'), 6),
 (('W 22 St & 10 Ave', 'Broadway & Battery Pl'), 1),
 (('Sullivan St & Washington Sq', 'E 9 St & Avenue C'), 1),
 (('W 22 St & 8 Ave', 'E 25 St & 1 Ave'), 1),
 (('E 45 St & 3 Ave', 'W 41 St & 8 Ave'), 8)]

## Task 5

In this task, you are asked to compute the station with the most riders started from, per each gender of the *'Subscriber'* user. Meaning, what was the station name with the highest number of bike pickups for female riders, for male riders and for unknown riders.

The output will be a list of tuples, each includes a gender label (as indicated below) and another tuple consisting of a station name, and the total number of trips started at that station for that gender. The expected output are included below. Please provide your HOF expression below.

The label mapping for the gender column in citibike.csv is: (Zero=**Unknown**; 1=**Male**; 2=**Female**)

In [21]:
!python mr_word_count.py book.txt 2>/dev/null

"words"	38538


In [22]:
from mr_word_count import MRWordFrequencyCount

In [23]:
job = MRWordFrequencyCount(args=['book.txt'])

In [24]:
with job.make_runner() as runner:
    runner.run()
    for line in runner.cat_output():
        print(line)

No configs specified for inline runner


b'"words"\t38538\n'
b''
b'"lines"\t5877\n'
b''
b'"chars"\t246432\n'


In [40]:
import mapreduce as mr
from mrjob.job import MRJob, MRStep

class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            yield word.lower(), 1

    def reducer(self, key, values):
        yield key, sum(values)
        
        
word_count = list(mr.runJob(
    enumerate(open('book.txt', 'r', encoding='utf8')),
    MRWordFrequencyCount(args=[])
))
word_count[:10]

[('"', 145),
 ('"defects,"', 1),
 ('"information', 1),
 ('"plain', 2),
 ('"project', 5),
 ('"right', 1),
 ('#51302]', 1),
 ('$5,000)', 1),
 ('&', 3),
 ("'as-is',", 1)]

In [39]:
class MRWordMostCount(MRJob):

    def mapper(self, word, count):
        yield 'findMax', (word, count)

    def reducer(self, _, wordCounts):
        yield max(wordCounts, key=lambda x:x[1])
        
        
word_c = list(mr.runJob(
    word_count,
    MRWordMostCount(args=[])
))
word_c[:10]

[('the', 2427)]

In [41]:
class MRFindMaxCount(MRJob):
    
    def mapper1(self, _, line):
        for word in line.split():
            yield word.lower(), 1

    def reducer1(self, key, values):
        yield key, sum(values)

    def mapper2(self, word, count):
        yield 'findMax', (word, count)

    def reducer2(self, _, wordCounts):
        yield max(wordCounts, key=lambda x:x[1])
        
    def steps(self):
        return [
            MRStep(mapper=self.mapper1, reducer=self.reducer1),
            MRStep(mapper=self.mapper2, reducer=self.reducer2)
        ]
    
    
word_max = list(mr.runJob(
    enumerate(open('book.txt', 'r', encoding='utf8')),
    MRFindMaxCount(args=[])
))
word_max

[('the', 2427)]

In [168]:
with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    for line in reader:
        print(line)
        break

OrderedDict([('cartodb_id', '1'), ('the_geom', ''), ('tripduration', '801'), ('starttime', '2015-02-01 00:00:00+00'), ('stoptime', '2015-02-01 00:14:00+00'), ('start_station_id', '521'), ('start_station_name', '8 Ave & W 31 St'), ('start_station_latitude', '40.75044999'), ('start_station_longitude', '-73.99481051'), ('end_station_id', '423'), ('end_station_name', 'W 54 St & 9 Ave'), ('end_station_latitude', '40.76584941'), ('end_station_longitude', '-73.98690506'), ('bikeid', '17131'), ('usertype', 'Subscriber'), ('birth_year', '1978'), ('gender', '2')])


In [None]:
<YOUR HOF FUNCTIONS (if any)>

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output5 = <YOUR HOF EXPRESSION on reader>

output5

In [24]:
<YOUR HOF FUNCTIONS (if any)>

with open('citibike.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output5 = <YOUR HOF EXPRESSION on reader>

output5

[('Female', ('W 21 St & 6 Ave', 107)),
 ('Male', ('8 Ave & W 31 St', 488)),
 ('Unknown', ('Pearl St & Hanover Square', 1))]