## HW 2.0

#### How do you merge  two sorted  lists/arrays of records of the form [key, value]?

Lists are merged together partially in the combiner and then to their final state in the Reducer.  Lists of [key, value] are processed by key with values streamed in the order they were received from the stream.  Values with the same key are combined together under that key so less data is passed to the Reducer  

#### Where is this  used in Hadoop MapReduce?

In Hadoop this functionality is used in the Combiner.  If there is no Combiner the Reducer accomplishes this step.  It kicks off with the Hadoop Shuffle after all the Mapper output is complete.  The merge/sort phase concludes before the individual Reducer tasks begin.

#### What is  a combiner function in the context of Hadoop?
#### Give an example where it can be used and justify why it should be used in the context of this problem.

A combiner function in Hadoop is a function that occurs between the Mapper and Reducer to consolidate the Mapper output to reduce the amount of data streamed to the Reducer.

A combiner can be used in almost any problem.  One example is a simple wordcount.  A combiner can consolidate the instances of a specific word occuring and do a preliminary sum.  For instance, if the Mapper output the word "Homework" 5 times the Combiner would consolidate the 5 individual lines of (Homework, 1) into (Homework, 5).  This means that the Reducer receives 1 line instead of 5, freeing up memory

#### What is the Hadoop shuffle?

The Hadoop Shuffle is the step where data output is transferred from the mapper to the reducer.  In the shuffle, output from the mapper is sent to the reducer(s) based on key.  The Hadoop Shuffle basically performs Group By and Sort on the output.

## HW2.1: Counters as a debugging aid

In [3]:
!curl -L https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0 -o Consumer_Complaints.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0curl: (35) schannel: failed to receive handshake, SSL/TLS connection failed


In [76]:
%%writefile SimWordCount.py
from mrjob.job import MRJob

class SimWordCount(MRJob):
    def mapper(self, _, line):        
        row = line.strip().lower().split(",")
        product = row[1]
        try:
            int(row[0])
        except:
            return
        if product in ["debt collection", "mortgage"]:
            self.increment_counter(product, 'Num_of_records', 1)
        else:
            self.increment_counter("other", 'Num_of_records', 1)

if __name__ == '__main__':
    SimWordCount().run()


Writing SimWordCount.py


In [79]:
from SimWordCount import SimWordCount
mr_job = SimWordCount(args=['Consumer_Complaints.csv'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    print runner.counters()

[{'debt collection': {'Num_of_records': 44372}, 'other': {'Num_of_records': 142788}, 'mortgage': {'Num_of_records': 125752}}]


## HW2.2: Analyze the performance of your Mappers, Combiners and Reducers using Counters

#### Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer  should be 1 and 4 respectively. Please explain.

In [4]:
%%writefile oneLine.txt
foo foo quux labs foo bar quux

Writing oneLine.txt


In [152]:
%%writefile mr_wc_counter.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
 
WORD_RE = re.compile(r"[\w']+")
 
class MRWordFreqCount(MRJob):
    def mapper(self, _, line):
        self.increment_counter('group', 'Num_mapper_calls', 1)
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
            

    def reducer(self, word, counts):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        yield word, sum(counts)

if __name__ == '__main__':
    MRWordFreqCount.run()


Overwriting mr_wc_counter.py


In [153]:
!python mr_wc_counter.py oneLine.txt

"bar"	1
"foo"	3
"labs"	1
"quux"	2


No configs found; falling back on auto-configuration
Creating temp directory c:\users\z030757\appdata\local\temp\mr_wc_counter.z030757.20160705.191119.048000
Running step 1 of 1...
Counters: 1
	group
		Num_mapper_calls=1
Counters: 2
	group
		Num_mapper_calls=1
		Num_reducer_calls=4
Streaming final output from c:\users\z030757\appdata\local\temp\mr_wc_counter.z030757.20160705.191119.048000\output...
Removing temp directory c:\users\z030757\appdata\local\temp\mr_wc_counter.z030757.20160705.191119.048000...


In [16]:
from mr_wc_counter import MRWordFreqCount
mr_job = MRWordFreqCount(args=['oneLine.txt'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    print runner.counters()


[{'group': {'Num_mapper_calls': 1, 'Num_reducer_calls': 4}}]


The mapper counter and reducer counter show the number of times that the job was sent to different mappers and reducers.  In this case, the line of text was divided into component words in 1 mapper.  These individual words were sent to 4 different reducers.  The 4 different reducers correspond to the fact that there are 4 uniques words in the dataset (foo, quux, labs and bar).  This occurs becausethese words become the "key" in our mapper output.

#### Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.


In [25]:
%%writefile complaint_wc_counter.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import csv

cols = 'Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?'.split(',')
 
WORD_RE = re.compile(r"[\w']+")
 
class ComWordFreqCount(MRJob):
    def mapper(self, _, line):        
        self.increment_counter('group', 'Num_mapper_calls', 1)
        row = dict(zip(cols, [ a.strip() for a in csv.reader([line]).next()]))    
        try:
            int(row['Complaint ID'])
        except:
            return
        for word in WORD_RE.findall(row['Issue']):
            yield word.lower(), 1
            
        
          

    def reducer(self, word, counts):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        yield word, sum(counts)

if __name__ == '__main__':
    ComWordFreqCount.run()


Overwriting complaint_wc_counter.py


In [26]:
from complaint_wc_counter import ComWordFreqCount
mr_job = ComWordFreqCount(args=['Consumer_Complaints.csv'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    print runner.counters()

[{'group': {'Num_mapper_calls': 312912, 'Num_reducer_calls': 174}}]


For the word count with no combiner, the mapper is called 312,912 times and the reducer is called 174 times

#### Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.

In [67]:
%%writefile complaintcombiner_wc_counter.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import csv

cols = 'Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?'.split(',')
 
WORD_RE = re.compile(r"[\w']+")
 
class ComboWordFreqCount(MRJob):
    def mapper(self, _, line):        
        self.increment_counter('group', 'Num_mapper_calls', 1)
        row = dict(zip(cols, [ a.strip() for a in csv.reader([line]).next()]))    
        try:
            int(row['Complaint ID'])
        except:
            return
        for word in WORD_RE.findall(row['Issue']):
            yield word.lower(), 1
            
        
    def combiner(self, word, counts):
        self.increment_counter('group', 'Num_combiner_calls', 1)
        yield word, sum(counts)


    def reducer(self, word, counts):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        yield word, sum(counts)

if __name__ == '__main__':
    ComboWordFreqCount.run()


Writing complaintcombiner_wc_counter.py


In [68]:
from complaintcombiner_wc_counter import ComboWordFreqCount
mr_job = ComboWordFreqCount(args=['Consumer_Complaints.csv'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    print runner.counters()

[{'group': {'Num_combiner_calls': 324, 'Num_mapper_calls': 312912, 'Num_reducer_calls': 174}}]


The mapper and reducer calls stay the same.  The combiner calls reduce the data sent to the reducer, though, making the code run faster

## HW2.2.1

#### Using a single reducer perform a sort of the words in decreasing order of word counts. Present the top 50 terms and their frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

In [17]:
%%writefile topten_wc_counter.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import csv

cols = 'Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?'.split(',')
 
WORD_RE = re.compile(r"[\w']+")
 
class toptenWordFreqCount(MRJob):
    
    def wcmapper(self, _, line):        
        self.increment_counter('group', 'Num_mapper_calls', 1)
        row = dict(zip(cols, [ a.strip() for a in csv.reader([line]).next()]))    
        try:
            int(row['Complaint ID'])
        except:
            return
        for word in WORD_RE.findall(row['Issue']):
            yield word.lower(), 1
            
        
    def wccombiner(self, word, counts):
        self.increment_counter('group', 'Num_combiner_calls', 1)
        yield word, sum(counts)


    def wcreducer(self, word, counts):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        yield word, sum(counts)

        
    
    def ttmapper(self, key, value):
        self.increment_counter('group', 'Num_ttmapper_calls', 1)
        yield None, (value, key)
        
    def ttreducer(self, key, values):
        topten = []
        self.increment_counter('group', 'Num_ttreducer_calls', 1)
        for count, word in values:
            topten.append((count, word))
            topten.sort()
            topfifty = topten[-50:]
            bottomten = topten[:10]

        for count, word in topfifty:
            yield None, (count, word)
        
          
        for count, word in bottomten:
            yield None, (count, word)

        
    def steps(self):
        return [
            MRStep(mapper=self.wcmapper,
                   combiner=self.wccombiner,
                   reducer=self.wcreducer),
            MRStep(mapper=self.ttmapper,
                   reducer=self.ttreducer) ]

    
    """def jobconf(self):
        orig_jobconf = super(toptenWordFreqCount, self).jobconf()        
        custom_jobconf = {
            'mapred.reduce.tasks': '1',
        }
        combined_jobconf = orig_jobconf
        combined_jobconf.update(custom_jobconf)
        self.jobconf = combined_jobconf
        return combined_jobconf """

if __name__ == '__main__':
    toptenWordFreqCount.run()

Overwriting topten_wc_counter.py


Jimi, per my note at the end of the assignment, I didn't use the MRJob sort because I didn't think it made sense to convert values to a string, pad those values with leading 0s and then yield them versus using .sort

In [22]:
!python topten_wc_counter.py --jobconf mapred.reduce.tasks=1 Consumer_Complaints.csv 

null	[5663, "being"]
null	[5663, "by"]
null	[5663, "caused"]
null	[5663, "funds"]
null	[5663, "low"]
null	[6248, "the"]
null	[6337, "lease"]
null	[6559, "reporting"]
null	[6938, "disputes"]
null	[7655, "disclosure"]
null	[7655, "verification"]
null	[7886, "other"]
null	[8158, "billing"]
null	[8178, "unable"]
null	[8401, "to"]
null	[8625, "broker"]
null	[8625, "mortgage"]
null	[8625, "originator"]
null	[8671, "communication"]
null	[8671, "tactics"]
null	[8868, "application"]
null	[9484, "problems"]
null	[10555, "deposits"]
null	[10555, "withdrawals"]
null	[10731, "my"]
null	[13983, "of"]
null	[16205, "management"]
null	[16205, "opening"]
null	[16448, "and"]
null	[17972, "attempts"]
null	[17972, "collect"]
null	[17972, "cont'd"]
null	[17972, "owed"]
null	[18477, "not"]
null	[19000, "closing"]
null	[27874, "debt"]
null	[29069, "information"]
null	[29069, "on"]
null	[29133, "incorrect"]
null	[34903, "report"]
null	[36767, "escrow"]
null	[36767, "servicing"]
null	[39993, "payments"]
null	[4

No configs found; falling back on auto-configuration
Creating temp directory c:\users\z030757\appdata\local\temp\topten_wc_counter.z030757.20160705.154946.542000
Running step 1 of 2...
Counters: 2
	group
		Num_combiner_calls=324
		Num_mapper_calls=312912
Counters: 3
	group
		Num_combiner_calls=324
		Num_mapper_calls=312912
		Num_reducer_calls=174
Running step 2 of 2...
Counters: 1
	group
		Num_ttmapper_calls=174
Counters: 2
	group
		Num_ttmapper_calls=174
		Num_ttreducer_calls=1
Streaming final output from c:\users\z030757\appdata\local\temp\topten_wc_counter.z030757.20160705.154946.542000\output...
Removing temp directory c:\users\z030757\appdata\local\temp\topten_wc_counter.z030757.20160705.154946.542000...


## HW2.2.2:

#### Repeat HW2.2.1 using 3 reducers. Use the same code as in HW2.2.1  with just one modification to the command line: just add --jobconf mapred.reduce.tasks=3 as see presented here
#### Describe what you see. Is this correct?

In [21]:
!python topten_wc_counter.py --jobconf mapred.reduce.tasks=3 Consumer_Complaints.csv

null	[5663, "being"]
null	[5663, "by"]
null	[5663, "caused"]
null	[5663, "funds"]
null	[5663, "low"]
null	[6248, "the"]
null	[6337, "lease"]
null	[6559, "reporting"]
null	[6938, "disputes"]
null	[7655, "disclosure"]
null	[7655, "verification"]
null	[7886, "other"]
null	[8158, "billing"]
null	[8178, "unable"]
null	[8401, "to"]
null	[8625, "broker"]
null	[8625, "mortgage"]
null	[8625, "originator"]
null	[8671, "communication"]
null	[8671, "tactics"]
null	[8868, "application"]
null	[9484, "problems"]
null	[10555, "deposits"]
null	[10555, "withdrawals"]
null	[10731, "my"]
null	[13983, "of"]
null	[16205, "management"]
null	[16205, "opening"]
null	[16448, "and"]
null	[17972, "attempts"]
null	[17972, "collect"]
null	[17972, "cont'd"]
null	[17972, "owed"]
null	[18477, "not"]
null	[19000, "closing"]
null	[27874, "debt"]
null	[29069, "information"]
null	[29069, "on"]
null	[29133, "incorrect"]
null	[34903, "report"]
null	[36767, "escrow"]
null	[36767, "servicing"]
null	[39993, "payments"]
null	[4

No configs found; falling back on auto-configuration
Creating temp directory c:\users\z030757\appdata\local\temp\topten_wc_counter.z030757.20160705.135237.531000
Running step 1 of 2...
Counters: 2
	group
		Num_combiner_calls=324
		Num_mapper_calls=312912
Counters: 3
	group
		Num_combiner_calls=324
		Num_mapper_calls=312912
		Num_reducer_calls=174
Running step 2 of 2...
Counters: 1
	group
		Num_ttmapper_calls=174
Counters: 2
	group
		Num_ttmapper_calls=174
		Num_ttreducer_calls=1
Streaming final output from c:\users\z030757\appdata\local\temp\topten_wc_counter.z030757.20160705.135237.531000\output...
Removing temp directory c:\users\z030757\appdata\local\temp\topten_wc_counter.z030757.20160705.135237.531000...


Since the python code is forcing the sort into 1 reducer (by passing values all with 1 key), there is only 1 reducer that can be called.  This means that the output is still correct, but with 1 reducer.

## HW2.3: Shopping Cart Analysis

#### Do some exploratory data analysis of this dataset guided by the following questions:. 

#### How many unique items are available from this supplier?

In [132]:
%%writefile unique_items.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class UniqueItems(MRJob):
    def wcmapper(self, _, line):        
        self.increment_counter('group', 'Num_mapper_calls', 1)
        words = line.strip().split(" ")
        for word in words:
            yield word, 1
    
    def wccombiner(self, word, counts):
        self.increment_counter('group', 'Num_combiner_calls', 1)
        yield word, sum(counts)        
        
    def wcreducer(self, word, counts):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        yield word, sum(counts)
    
    def ttmapper(self, key, value):
        self.increment_counter('group', 'Num_ttmapper_calls', 1)
        yield None, (value, key)
        
    def ttreducer(self, key, values):
        topten = []
        self.increment_counter('group', 'Num_ttreducer_calls', 1)
        for count, word in values:
            topten.append((count, word))
            topten.sort()

        print len(topten)

        
    def steps(self):
        return [
            MRStep(mapper=self.wcmapper,
                   combiner=self.wccombiner,
                   reducer=self.wcreducer),
            MRStep(mapper=self.ttmapper,
                   reducer=self.ttreducer) ]

if __name__ == '__main__':
    UniqueItems.run()



Overwriting unique_items.py


In [133]:
!python unique_items.py ProductPurchaseData.txt

12592


No configs found; falling back on auto-configuration
Creating temp directory c:\users\z030757\appdata\local\temp\unique_items.z030757.20160705.185234.054000
Running step 1 of 2...
Counters: 2
	group
		Num_combiner_calls=17745
		Num_mapper_calls=31101
Counters: 3
	group
		Num_combiner_calls=17745
		Num_mapper_calls=31101
		Num_reducer_calls=12592
Running step 2 of 2...
Counters: 1
	group
		Num_ttmapper_calls=12592
Counters: 2
	group
		Num_ttmapper_calls=12592
		Num_ttreducer_calls=1
Streaming final output from c:\users\z030757\appdata\local\temp\unique_items.z030757.20160705.185234.054000\output...
Removing temp directory c:\users\z030757\appdata\local\temp\unique_items.z030757.20160705.185234.054000...


#### 2.3.1 OPTIONAL Using 2 reducers:  Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce.


In [145]:
%%writefile unique_items_stats.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class UniqueItems(MRJob):
    
    def __init__(self, *args, **kwargs):
        super(UniqueItems, self).__init__(*args, **kwargs)
        self.modelStats = {}
    
    def mapper(self, _, line):        
        self.increment_counter('group', 'Num_mapper_calls', 1)
        basket = line.strip()
        item = basket.split(" ")
        item_count = 0
        for i in item:
            item_count += 1
        #print "basket", (item, item_count)
        yield "basket", (item, item_count)    
        words = line.strip().split(" ")
        for word in words:
            yield "wc", (word, 1)
    
    #Split tuple yields and sum
    
    def reducer(self, key, values):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        basket_count = []
        top_items = []
        if key == "basket":
            for item, item_count in values:
                basket_count.append(item_count)
            print "Maximum basket size is: ", max(basket_count)    
        else:
            for word, counts in values:
                self.modelStats[word] = self.modelStats.get(word, 0) + 1 #checks for key in dictionary. if not, adds it with value 0 and adds 1
            #print self.modelStats
            unique_words = len(self.modelStats.keys())
            print "There are ", unique_words, "unique items in the dataset"
            print "The top 50 items are:"
            for k in self.modelStats.keys():
                top_items.append((self.modelStats[k], k))
                top_items.sort()
                topfifty = top_items[-50:]

            for count, word in topfifty:
                yield word, count

    
    
if __name__ == '__main__':
    UniqueItems.run()



Overwriting unique_items_stats.py


In [146]:
!python unique_items_stats.py ProductPurchaseData.txt

Maximum basket size is:  37
There are  12592 unique items in the dataset
The top 50 items are:
"GRO85051"	1214
"DAI22896"	1219
"GRO81087"	1220
"DAI31081"	1261
"GRO15017"	1275
"ELE91337"	1289
"DAI43223"	1290
"SNA96271"	1295
"ELE59935"	1311
"DAI88807"	1316
"ELE74482"	1316
"GRO61133"	1321
"ELE56788"	1345
"GRO38814"	1352
"SNA90094"	1390
"SNA93860"	1407
"FRO53271"	1420
"FRO35904"	1436
"ELE34057"	1489
"GRO94758"	1489
"ELE99737"	1516
"FRO78087"	1531
"DAI22177"	1627
"SNA55762"	1646
"ELE66810"	1697
"FRO32293"	1702
"DAI83733"	1712
"ELE66600"	1713
"GRO46854"	1756
"DAI63921"	1773
"GRO56726"	1784
"ELE74009"	1816
"GRO30386"	1840
"FRO85978"	1918
"GRO71621"	1920
"GRO59710"	2004
"SNA99873"	2083
"GRO21487"	2115
"FRO80039"	2233
"ELE26917"	2292
"DAI85309"	2293
"FRO31317"	2330
"SNA45677"	2455
"DAI75645"	2736
"ELE32164"	2851
"SNA80324"	3044
"GRO73461"	3602
"ELE17451"	3875
"FRO40251"	3881
"DAI62779"	6667


No configs found; falling back on auto-configuration
Creating temp directory c:\users\z030757\appdata\local\temp\unique_items_stats.z030757.20160705.185832.238000
Running step 1 of 1...
Counters: 1
	group
		Num_mapper_calls=31101
Counters: 2
	group
		Num_mapper_calls=31101
		Num_reducer_calls=2
Streaming final output from c:\users\z030757\appdata\local\temp\unique_items_stats.z030757.20160705.185832.238000\output...
Removing temp directory c:\users\z030757\appdata\local\temp\unique_items_stats.z030757.20160705.185832.238000...


## HW2.4. (Computationally prohibitive but then again Hadoop can handle this) Pairs

#### List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset)  in decreasing order of support  for frequent (100>count) itemsets of size 2. 

#### Please output records of the following form for the top 50 pairs (itemsets of size 2): item1, item2, support count, support

In [28]:
%%writefile item_pairs_test.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from operator import itemgetter, attrgetter, methodcaller
import itertools

#import timeit

#start = timeit.default_timer()

class ItemPairs(MRJob):

    def __init__(self, *args, **kwargs):
        super(ItemPairs, self).__init__(*args, **kwargs)
        self.modelStats = {}    
    
    
    def wcmapper(self, _, line):
        self.increment_counter('group', 'Num_mapper_calls', 1)
        basket_count = 0
        basket = line.strip()
        item = basket.split(" ")
        if len(item)>1:
            basket_count +=1
        yield "basket_count", basket_count
        for subset in itertools.combinations(sorted(set(item)), 2):
            yield (subset[0]+"-"+subset[1]),1


    def wccombiner(self, key, values):
        self.increment_counter('group', 'Num_combiner_calls', 1)
        yield key, sum(values)
                
    def wcreducer(self, key, values):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        yield None, (sum(values), key)

          
    def ttreducer(self, key, values):
        topten = []
        bc = 0
        self.increment_counter('group', 'Num_ttreducer_calls', 1)
        for count, word in values:
            if word == "basket_count":
                bc = count
            elif count >= 100:
                topten.append((count, word))
                topten.sort(reverse=True)
                topfifty = topten[:50]
                topfifty.sort(key=itemgetter(1))
                topfifty.sort(key=itemgetter(0), reverse=True)
                
        #print topfifty
        #print bc
        #print self.modelStats
        for count, word in topfifty:
            print word, count, ((count*1.0)/(bc*1.0))   
    
    def steps(self):
        return [
            MRStep(mapper=self.wcmapper,
                   combiner=self.wccombiner,
                   reducer=self.wcreducer),
            MRStep(#mapper=self.ttmapper,
                   reducer=self.ttreducer) ]        
    
if __name__ == '__main__':
    ItemPairs.run()   
    
#stop = timeit.default_timer()

#print "runtime is ", stop - start, " seconds on a single computer running Windows"

Overwriting item_pairs_test.py


In [None]:
!python item_pairs_test.py ProductPurchaseDataTest1.txt

In [29]:
from item_pairs_test import ItemPairs
import timeit

start = timeit.default_timer()
mr_job = ItemPairs(args=['ProductPurchaseData.txt'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    print runner.counters()

stop = timeit.default_timer()

print "runtime is ", stop - start, " seconds on a single computer running Windows"  

DAI62779-ELE17451 1592 0.0511880646925
FRO40251-SNA80324 1412 0.0454004694383
DAI75645-FRO40251 1254 0.0403202469374
FRO40251-GRO85051 1213 0.0390019613517
DAI62779-GRO73461 1139 0.0366226166361
DAI75645-SNA80324 1130 0.0363332368734
DAI62779-FRO40251 1070 0.0344040384554
DAI62779-SNA80324 923 0.0296775023311
DAI62779-DAI85309 918 0.0295167357963
ELE32164-GRO59710 911 0.0292916626475
DAI62779-DAI75645 882 0.0283592167454
FRO40251-GRO73461 882 0.0283592167454
DAI62779-ELE92920 877 0.0281984502106
FRO40251-FRO92469 835 0.026848011318
DAI62779-ELE32164 832 0.0267515513971
DAI75645-GRO73461 712 0.0228931545609
DAI43223-ELE32164 711 0.022861001254
DAI62779-GRO30386 709 0.02279669464
ELE17451-FRO40251 697 0.0224108549564
DAI85309-ELE99737 659 0.0211890292917
DAI62779-ELE26917 650 0.020899649529
GRO21487-GRO73461 631 0.0202887366966
DAI62779-SNA45677 604 0.0194205974084
ELE17451-SNA80324 597 0.0191955242597
DAI62779-GRO71621 595 0.0191312176457
DAI62779-SNA55762 593 0.0190669110318
DAI62779-D

## HW2.5: Stripes

#### Repeat 2.4 using the stripes design pattern for finding cooccuring pairs (and out.

In [213]:
%%writefile item_stripes_test2.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import itertools
from operator import itemgetter, attrgetter, methodcaller
from collections import defaultdict


#TEST



class ItemStripes(MRJob):

    def __init__(self, *args, **kwargs):
        super(ItemStripes, self).__init__(*args, **kwargs)
     


    
    def wcmapper(self, _, line):
        self.increment_counter('group', 'Num_mapper_calls', 1)
        comboset = defaultdict(dict)
        basket_count = 0
        basket = line.strip()
        item = basket.split(" ")
        if len(item)>1:
            basket_count +=1
        yield "basket_count", basket_count
        for subset in itertools.combinations(sorted(set(item)), 2):
            comboset[subset[0]][subset[1]] = 1
        for k in comboset.keys():
            yield k, comboset[k]
            

    """def wccombiner(self, key, values):
        self.increment_counter('group', 'Num_combiner_calls', 1)
        test = defaultdict(dict)
        if key =="basket_count":
            yield key, sum(values)
        else:    
            test[key] = test.get(key, {})
        
        for value in values:
                for k, v in value.iteritems():
                    if k not in test[key]:
                        test[key][k] = v    
                    else:
                        test[key][k] = test[key][k] + v 

        for k in test.keys():
            yield k, test[k]"""
            
            
    def wcreducer(self, key, values):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        test = defaultdict(dict)
        if key =="basket_count":
            yield None, (sum(values), key)
        else:    
            test[key] = test.get(key, {})
        
        for value in values:
                for k, v in value.iteritems():
                    if k not in test[key]:
                        test[key][k] = v    
                    else:
                        test[key][k] = test[key][k] + v
 
        for k in test.keys():
            for i in test[k].keys():
                if test[k][i] < 100:
                    del test[k][i]
        
        for k in test.keys():
            if test[k] == {}:
                del test[k]
        
        for k in test.keys():
            for i in test[k].keys():
                yield None, (test[k][i], str(k + "-" + i))

    
    def ttreducer(self, key, values):
        bc = 0
        topten = []
        self.increment_counter('group', 'Num_ttreducer_calls', 1)
        for count, word in values:
            if word == "basket_count":
                bc = count
            else:
                topten.append((count, word))
                topten.sort(reverse=True)
                topfifty = topten[:50]
                topfifty.sort(key=itemgetter(1))
                topfifty.sort(key=itemgetter(0), reverse=True)
                
        #print topfifty
        #print bc
        #print self.modelStats
        for count, word in topfifty:
            print word, count, ((count*1.0)/(bc*1.0))  
    
    def steps(self):
        return [
            MRStep(mapper=self.wcmapper,
                   #combiner=self.wccombiner,
                   reducer=self.wcreducer),
            MRStep(#mapper=self.ttmapper,
                   reducer=self.ttreducer) ]                   
    
if __name__ == '__main__':
    ItemStripes.run()   
    


Overwriting item_stripes_test2.py


In [215]:
from item_stripes_test2 import ItemStripes
import timeit

start = timeit.default_timer()
mr_job = ItemStripes(args=['ProductPurchaseData.txt'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    print runner.counters()

stop = timeit.default_timer()

print "runtime is ", stop - start, " seconds on a single computer running Windows"  

DAI62779-ELE17451 1592 0.0511880646925
FRO40251-SNA80324 1412 0.0454004694383
DAI75645-FRO40251 1254 0.0403202469374
FRO40251-GRO85051 1213 0.0390019613517
DAI62779-GRO73461 1139 0.0366226166361
DAI75645-SNA80324 1130 0.0363332368734
DAI62779-FRO40251 1070 0.0344040384554
DAI62779-SNA80324 923 0.0296775023311
DAI62779-DAI85309 918 0.0295167357963
ELE32164-GRO59710 911 0.0292916626475
DAI62779-DAI75645 882 0.0283592167454
FRO40251-GRO73461 882 0.0283592167454
DAI62779-ELE92920 877 0.0281984502106
FRO40251-FRO92469 835 0.026848011318
DAI62779-ELE32164 832 0.0267515513971
DAI75645-GRO73461 712 0.0228931545609
DAI43223-ELE32164 711 0.022861001254
DAI62779-GRO30386 709 0.02279669464
ELE17451-FRO40251 697 0.0224108549564
DAI85309-ELE99737 659 0.0211890292917
DAI62779-ELE26917 650 0.020899649529
GRO21487-GRO73461 631 0.0202887366966
DAI62779-SNA45677 604 0.0194205974084
ELE17451-SNA80324 597 0.0191955242597
DAI62779-GRO71621 595 0.0191312176457
DAI62779-SNA55762 593 0.0190669110318
DAI62779-D

Jimi, I got the Stripes code working (as you can see above).  Interestingly, it bombs out when I include a combiner in the initial MRStep.  This is despite the fact that including a combiner would reduce the amount of data streamed to the reducer.  I believe the issue is a lack of memory on my PC.  

#### Per my e-mail, here is code for 2.5 with the MRJob SORT_VALUES code.  It doesn't sort numbers correctly unless they are the same length (i.e. you add code to make 10 into 0010 or 100 into 0100).

In [40]:
%%writefile item_stripes_test3.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import itertools
from operator import itemgetter, attrgetter, methodcaller
from collections import defaultdict


#TEST



class ItemStripes(MRJob):

    SORT_VALUES = True


    JOBCONF = {
      'mapred.output.key.comparator.class':
          'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
          'mapred.text.key.comparator.options': '-k1 -k2nr',
        }

    
    def __init__(self, *args, **kwargs):
        super(ItemStripes, self).__init__(*args, **kwargs)
     


    
    def wcmapper(self, _, line):
        self.increment_counter('group', 'Num_mapper_calls', 1)
        comboset = defaultdict(dict)
        basket_count = 0
        basket = line.strip()
        item = basket.split(" ")
        if len(item)>1:
            basket_count +=1
        yield "basket_count", basket_count
        for subset in itertools.combinations(sorted(set(item)), 2):
            comboset[subset[0]][subset[1]] = 1
        for k in comboset.keys():
            yield k, comboset[k]
            

    """def wccombiner(self, key, values):
        self.increment_counter('group', 'Num_combiner_calls', 1)
        test = defaultdict(dict)
        if key =="basket_count":
            yield key, sum(values)
        else:    
            test[key] = test.get(key, {})
        
        for value in values:
                for k, v in value.iteritems():
                    if k not in test[key]:
                        test[key][k] = v    
                    else:
                        test[key][k] = test[key][k] + v 

        for k in test.keys():
            yield k, test[k]"""
            
            
    def wcreducer(self, key, values):
        self.increment_counter('group', 'Num_reducer_calls', 1)
        test = defaultdict(dict)
        if key =="basket_count":
            yield None, (sum(values), key)
        else:    
            test[key] = test.get(key, {})
        
        for value in values:
                for k, v in value.iteritems():
                    if k not in test[key]:
                        test[key][k] = v    
                    else:
                        test[key][k] = test[key][k] + v
 
        for k in test.keys():
            for i in test[k].keys():
                if test[k][i] < 100:
                    del test[k][i]
        
        for k in test.keys():
            if test[k] == {}:
                del test[k]
        
        for k in test.keys():
            for i in test[k].keys():
                yield None, (int(test[k][i]), str(k + "-" + i))
                
    def ttmapper(self, key, values):
        yield None, values

    def ttreducer(self, key, values):
        for value in values:
            print key, value
        """bc = 0
        topten = []
        self.increment_counter('group', 'Num_ttreducer_calls', 1)
        for value in values:
            if value == "basket_count":
                bc = key
            else:
                topten.append((key, value))
        
                #topfifty = topten[:50]
                #topfifty.sort(key=itemgetter(1))
                #topfifty.sort(key=itemgetter(0), reverse=True)
                
        print topten
        #print bc
        #print self.modelStats
        #for count, word in topfifty:
            #print word, count, ((count*1.0)/(bc*1.0))"""  
    
    def steps(self):
        return [
            MRStep(mapper=self.wcmapper,
                   #combiner=self.wccombiner,
                   reducer=self.wcreducer),
            MRStep(mapper=self.ttmapper,
                   reducer=self.ttreducer) ]                   
    
if __name__ == '__main__':
    ItemStripes.run()   
    


Overwriting item_stripes_test3.py


In [41]:
!python item_stripes_test3.py ProductPurchaseData.txt

None [100, 'DAI23334-ELE17451']
None [100, 'DAI35347-DAI85309']
None [100, 'DAI62779-GRO17075']
None [100, 'DAI63921-ELE11160']
None [100, 'DAI85309-ELE14480']
None [100, 'ELE11111-GRO59710']
None [100, 'ELE14480-SNA80324']
None [100, 'ELE17451-ELE37770']
None [100, 'ELE17451-SNA59061']
None [100, 'ELE32164-GRO35122']
None [100, 'ELE66600-FRO98729']
None [100, 'ELE86561-SNA45677']
None [100, 'FRO31317-GRO81087']
None [100, 'FRO40251-GRO50832']
None [100, 'FRO40251-GRO56989']
None [100, 'FRO78087-GRO30386']
None [100, 'FRO78087-GRO94758']
None [100, 'FRO80039-GRO64900']
None [100, 'GRO38814-SNA93860']
None [100, 'GRO46854-SNA66583']
None [100, 'GRO59710-SNA93860']
None [100, 'GRO64900-SNA45677']
None [100, 'GRO73461-GRO88511']
None [101, 'DAI22177-ELE66600']
None [101, 'DAI62779-SNA31619']
None [101, 'DAI62779-SNA55617']
None [101, 'DAI62779-SNA74022']
None [101, 'DAI62779-SNA82528']
None [101, 'DAI75645-SNA18336']
None [101, 'DAI87448-ELE91337']
None [101, 'DAI87448-GRO21487']
None [10

No configs found; falling back on auto-configuration
ignoring partitioner keyword arg (requires real Hadoop): 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
Creating temp directory c:\users\z030757\appdata\local\temp\item_stripes_test3.z030757.20160708.204902.367000
Running step 1 of 2...
Counters: 1
	group
		Num_mapper_calls=31101
Counters: 2
	group
		Num_mapper_calls=31101
		Num_reducer_calls=12012
Running step 2 of 2...
Streaming final output from c:\users\z030757\appdata\local\temp\item_stripes_test3.z030757.20160708.204902.367000\output...
Removing temp directory c:\users\z030757\appdata\local\temp\item_stripes_test3.z030757.20160708.204902.367000...
