# HW2 Supporting Material
James G. Shanahan 6/28/201

## Table of Contents <a name="TOC"></a> 
  
1.  [Introduction](#1)   
2.  [Counters in MRJob](#2)   
3.  [JSON and MRJob Serialization](#3)   
4.  [Calculate Summary Stats](#4) 
    1.  [calculate salary average](#4.1)
    2.  [calculate mean and variance](#4.2)
5.  [Order inversion pattern in MRJob](#5)
0.  [Useful Links for Jupyter Notebooks](#0) 

## 1.  Introduction <a name="1"></a> 
[Back to Table of Contents](#TOC)
    
You can view this notebook via NBViewer [here](http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/pjd6maluq4ogt7m/HW02-Supporting-Material.ipynb#4.1). 
+ http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/pjd6maluq4ogt7m/HW02-Supporting-Material.ipynb#4.1

This notebook provides several examples of key patterns that you will use repeatedly in HW2. Please run and extend these examples to get warmed up before tackling HW2.

## 2.  Counters in MRJob <a name="2"></a> 
[Back to Table of Contents](#TOC)
    
Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. Think of them as global variables with read/write constraints

Here is an example of them in use:

   + http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/5thl14n4pqvhzt5/Counter.ipynb 


In [5]:
#download the book great-expectations.txt
! mkdir output-document=data
!wget --output-document=data/great-expectations.txt http://www.gutenberg.org/cache/epub/1400/pg1400.txt
    
# OR use
#!curl http://www.gutenberg.org/cache/epub/1400/pg1400.txt > great-expectations.txt

/bin/sh: wget: command not found


## 3.  JSON and MRJob Serialization <a name="3"></a> 
[Back to Table of Contents](#TOC)
    
<LI> examples of work with JSON data (key-value data structures; like dictionaries)

<LI> MRJob Serialization  + JSON


In [47]:
mkdir JSON

mkdir: JSON: File exists


In [48]:
%%writefile JSON/data.json
{
 "maps":[
         {"id":"blabla","iscategorical":"0"},
         {"id":"blabla","iscategorical":"0"}
        ],
"masks":
         {"id":"valore"},
"om_points":"value",
"parameters":
         {"id":"valore"}
}

Writing JSON/data.json


In [50]:
import json
from pprint import pprint

with open('JSONtest/data.json') as data_file:    
    data = json.load(data_file)

pprint(data)

{u'maps': [{u'id': u'blabla', u'iscategorical': u'0'},
           {u'id': u'blabla', u'iscategorical': u'0'}],
 u'masks': {u'id': u'valore'},
 u'om_points': u'value',
 u'parameters': {u'id': u'valore'}}


In [51]:
print data["maps"][0]["id"]  # will return 'blabla'
print data["masks"]["id"]    # will return 'valore'
print data["om_points"]      # will return 'value'

blabla
valore
value


In [52]:
%%writefile JSON/chineseExample.txt
D1	1	Chinese Beijing	Chinese
D2	1	Chinese Chinese	Shanghai
D3	1	Chinese	Macao
D4	0	Tokyo Japan	Chinese
D5	0	Chinese Chinese	Chinese Tokyo Japan

Writing JSON/chineseExample.txt


In [53]:
%%writefile JSON/chineseExampleJSON.txt
{"email": {"id": "D1", "Label": "1", "content": {"subject":"Chinese Beijing", "body":"Chinese" } } }
{"email": {"id": "D2", "Label": "1", "content": {"subject":"Chinese Chinese", "body":"Shanghai" } } }
{"email": {"id": "D3", "Label": "1", "content": {"subject":"Chinese", "body":"Macao" } } }
{"email": {"id": "D4", "Label": "0", "content": {"subject":"Tokyo Japan", "body":"Chinese" } } }
{"email": {"id": "D5", "Label": "0", "content": {"subject":"Chinese Chinese", "body":"Chinese Tokyo Japan" } } }

Writing JSON/chineseExampleJSON.txt


# loading-and-parsing-a-json-file-in-python
For more background see [loading-and-parsing-a-json-file-in-python](http://stackoverflow.com/questions/12451431/loading-and-parsing-a-json-file-in-python)

In [33]:
mkdir JSON

In [44]:
# Open a file
filename = "JSON/foo.txt"
fo = open(filename, "w")
fo.write("Jimi wrote hello in the file\nline2 of this dribble");
# Close opend file
fo.close()


# Open a file and read the file contents
fo = open(filename, "r")
str = fo.read();
print "This is the entire file's content [", str, "]\n"
# Close opend file
fo.close()

# read all lines together into a list 
with open(filename) as f:
    content = f.readlines()
for l in content:
    print "line", l

#read the file line by line
with open(filename, "r") as ins:
    array = []
    for line in ins:
        array.append(line)
for l in content:
    print "read a line at time", l


This is the entire file's content [ Jimi wrote hello in the file
line2 of this dribble ]

line Jimi wrote hello in the file

line line2 of this dribble
read a line at time Jimi wrote hello in the file

read a line at time line2 of this dribble


# Read and Write to files

In [56]:
import json
from pprint import pprint

#assume one JSON dictionary per line (otherwise one will have to write a custom parser; a little more effort)
data = []
with open('JSON/chineseExampleJSON.txt', 'r') as f:
    for line in f:
        data.append(json.loads(line))
for d in data:
    pprint(d)
print data

{u'email': {u'Label': u'1',
            u'content': {u'body': u'Chinese', u'subject': u'Chinese Beijing'},
            u'id': u'D1'}}
{u'email': {u'Label': u'1',
            u'content': {u'body': u'Shanghai',
                         u'subject': u'Chinese Chinese'},
            u'id': u'D2'}}
{u'email': {u'Label': u'1',
            u'content': {u'body': u'Macao', u'subject': u'Chinese'},
            u'id': u'D3'}}
{u'email': {u'Label': u'0',
            u'content': {u'body': u'Chinese', u'subject': u'Tokyo Japan'},
            u'id': u'D4'}}
{u'email': {u'Label': u'0',
            u'content': {u'body': u'Chinese Tokyo Japan',
                         u'subject': u'Chinese Chinese'},
            u'id': u'D5'}}
[{u'email': {u'content': {u'body': u'Chinese', u'subject': u'Chinese Beijing'}, u'id': u'D1', u'Label': u'1'}}, {u'email': {u'content': {u'body': u'Shanghai', u'subject': u'Chinese Chinese'}, u'id': u'D2', u'Label': u'1'}}, {u'email': {u'content': {u'body': u'Macao', u'subject': u

In [57]:
#print a string of records
print data

[{u'email': {u'content': {u'body': u'Chinese', u'subject': u'Chinese Beijing'}, u'id': u'D1', u'Label': u'1'}}, {u'email': {u'content': {u'body': u'Shanghai', u'subject': u'Chinese Chinese'}, u'id': u'D2', u'Label': u'1'}}, {u'email': {u'content': {u'body': u'Macao', u'subject': u'Chinese'}, u'id': u'D3', u'Label': u'1'}}, {u'email': {u'content': {u'body': u'Chinese', u'subject': u'Tokyo Japan'}, u'id': u'D4', u'Label': u'0'}}, {u'email': {u'content': {u'body': u'Chinese Tokyo Japan', u'subject': u'Chinese Chinese'}, u'id': u'D5', u'Label': u'0'}}]


In [70]:
#print first record and various attributes
print "entire record as a string ", data[0]  
print "attributes of the email record as string -->", data[0]["email"] 
print "label of email -->", data[0]["email"]["Label"] 
print "body of email -->", data[0]["email"]["content"]["body"] 

# chopped off part of the JSON dictionary
# it is STILL a JSON dictionary (just a smaller subset of its parent) 
x=data[0]["email"]
print "body of email from CHOPPED record -->", x["content"]["body"]

entire record as a string  {u'email': {u'content': {u'body': u'Chinese', u'subject': u'Chinese Beijing'}, u'id': u'D1', u'Label': u'1'}}
attributes of the email record as string --> {u'content': {u'body': u'Chinese', u'subject': u'Chinese Beijing'}, u'id': u'D1', u'Label': u'1'}
label of email --> 1
body of email --> Chinese
body of email from CHOPPED record --> Chinese


# mrjob.protocol - input and output

Protocols translate raw bytes into key, value pairs.

Typically, protocols encode a key and value into bytes, and join them together with a tab character.

However, protocols with Value in their name ignore keys and simply read/write values (with key read in as None), allowing you to read and write data in arbitrary formats.

For more information, see [Protocols and Writing custom protocols](https://pythonhosted.org/mrjob/protocols.html).

# JSON Protocol for serialization

MRJob provides a seamless JSON reader and writer (i.e the mapper can read json lines and convert them into   lists)
We can test hadoop job locally (in windows or unix) on a small dataset without actually using huge hdfs files (quick !!)
Can orchestrate many mappers and reducers in the same code 

My task is to parse json formatted web log files and parse them, say the columns are sessionid,stepno and data. so the psuedo-code
Read the json files using mrjob protocol
        DEFAULT_INPUT_PROTOCOL = 'json_value'
        DEFAULT_OUTPUT_PROTOCOL = 'repr_value'  #  output is delimited

  2.  yield sessionid, (sessionid,stepno,data) from mapper
                      the Mapreduce will make sure that all sessionids(key) goes to same mapper.. with the remaining values sent as a dictionary (value) to make it easier for us to srt in reducer



In [284]:
%%writefile ProcessJSONRecords.py
import sys,time
#sys.path.append('/usr/lib/python2.4/site-packages/')
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
import json
from pprint import pprint


class ProcessJSONRecords(MRJob):
    DEFAULT_INPUT_PROTOCOL = 'json_value'
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'

    def mapper(self, _, lineStr):
        line = json.loads(lineStr)
        print "full JSON Dictionary", line
        emailID = line['email']['id']
        print "emailID is ", emailID
        label = line['email']['Label']
        content = line["email"]["content"]
        print line["email"]["Label"], line["email"]["content"]
        yield line["email"]["Label"], line["email"]["content"]

    def reducer(self, label, emailBodies):
        #for bodyText in emailBodies:
            #line_data='\t'.join(str(n) for n in d)
        #    yield label, str(bodyText)
        print label+"Reducer", emailBodies


if __name__ == '__main__':
    ProcessJSONRecords.run()


Overwriting ProcessJSONRecords.py


In [278]:
!head JSON/chineseExampleJSON.txt

{"email": {"id": "D1", "Label": "1", "content": {"subject":"Chinese Beijing", "body":"Chinese" } } }
{"email": {"id": "D2", "Label": "1", "content": {"subject":"Chinese Chinese", "body":"Shanghai" } } }
{"email": {"id": "D3", "Label": "1", "content": {"subject":"Chinese", "body":"Macao" } } }
{"email": {"id": "D4", "Label": "0", "content": {"subject":"Tokyo Japan", "body":"Chinese" } } }
{"email": {"id": "D5", "Label": "0", "content": {"subject":"Chinese Chinese", "body":"Chinese Tokyo Japan" } } }

In [285]:
!python ProcessJSONRecords.py --jobconf mapred.reduce.tasks=1 JSON/chineseExampleJSON.txt

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/ProcessJSONRecords.jshanahan.20160629.060733.041354
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/ProcessJSONRecords.jshanahan.20160629.060733.041354/step-0-mapper_part-00000
full JSON Dictionary {u'email': {u'content': {u'body': u'Chinese', u'subject': u'Chinese Beijing'}, u'id': u'D1', u'Label': u'1'}}
emailID is  D1
1 {u'body': u'Chinese', u'subject': u'Chinese Beijing'}
full JSON Dictionary {u'email': {u'content': {u'body': u'Shanghai', u'subject': u'Chinese Chinese'}, u'id': u'D2', u'Label': u'1'}}
emailID is  D2
1 {u'body': u'Shanghai', u'subject': u'Chinese Chinese'}
full JSON Dictionary {u'email': {u'content': {u'body': u'Macao', u'subject': u'Chinese'}, u'id': u'D3', u'Label': u'1'}}
emailID is  D3
1 {u'body': u'Macao', u'subject': u'Chinese'}
full JSON Dictionary {u'email': 

In [None]:
NaiveBayes/chineseExampleJSON.txt

## 4.  Calculate Summary Stats <a name="4"></a> 
[Back to Table of Contents](#TOC)
    

<LI> Calculate Average Salary
<LI> Calculate Mean and Variance/Standard deviation

### 4.1  Calculate Average Salary <a name="4.1"></a> 
[Back to Table of Contents](#TOC)
    1.  [calculate salary average](#4.1)

For more background on this example see:  Chapter 2 in this book: "Hadoop with MRJob"  

<LI> https://www.dropbox.com/s/jd3z2s216p9kc1z/hadoop-with-python-MRJOB.pdf?dl=0
<LI> Source code: https://www.dropbox.com/sh/j8oettuxbgztk0p/AAAwq9PpEeByecDmaSNslnBPa?dl=0


In [267]:
# salaries file is located here :
#   https://www.dropbox.com/s/sp87kq56achu8po/salaries.csv?dl=0
!head salaries.csv
!echo "-------------------"
!tail salaries.csv

"Aaron,Keontae E",AIDE BLUE CHIP,W02200,Youth Summer  ,06/10/2013,$11310.00,$873.63
"Aaron,Patricia G",Facilities/Office Services II,A03031,OED-Employment Dev ,10/24/1979,$53428.00,$52868.38
"Aaron,Petra L",ASSISTANT STATE'S ATTORNEY,A29005,States Attorneys Office ,09/25/2006,$68300.00,$67439.19
"Abaineh,Yohannes T",EPIDEMIOLOGIST,A65026,HLTH-Health Department ,07/23/2009,$62000.00,$58654.74
"Abbene,Anthony M",POLICE OFFICER TRAINEE,A99416,Police Department ,07/24/2013,$43999.00,$39686.95
"Abbey,Emmanuel",CONTRACT SERV SPEC II,A40001,M-R Info Technology ,05/01/2013,$52000.00,$47019.75
"Abdal-Rahim,Naim A",EMT Firefighter Suppression,A64120,Fire Department ,03/30/2011,$62175.00,$61451.50
"Abdi,Ezekiel W",POLICE SERGEANT,A99127,Police Department ,06/14/2007,$70918.00,$87900.27
"Abdul Adl,Attrice A",RADIO DISPATCHER SHERIFF,A38410,Sheriff's Office ,09/02/1999,$42438.00,$53667.53
"Abdul Aziz,Hajr E",AIDE BLUE CHIP,W02097,Youth Summer  ,06/18/2014,$11310.00,
-------------------
"Zimmerman,J

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

cols = 'Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay'.split(',')


class salaryavg(MRJob):

    def avgmapper(self, _, line):
        row = dict(zip(cols, [ a.strip() for a in csv.reader([line]).next()]))

        self.increment_counter('depts', row['Agency'], 1)

        yield row['JobTitle'], (int(float(row['AnnualSalary'][1:])), 1)

    def avgreducer(self, key, values):
        s = 0
        c = 0

        for average, count in values:
            s += average * count
            c += count

        if c > 3:
            self.increment_counter('stats', 'below3', 1)
            yield key, (s/c, c)

    def ttmapper(self, key, value):
        yield None, (value[0], key) # group by all, keep average and job title

    def ttreducer(self, key, values):
        topten = []
        for average, job in values:
            topten.append((average, job))
            topten.sort()
            topten = topten[-10:]

        for average, job in topten:
            yield None, (average, job)

    def steps(self):
        return [
            MRStep(mapper=self.avgmapper,
                   combiner=self.avgreducer,
                   reducer=self.avgreducer),
            MRStep(mapper=self.ttmapper,
                   combiner=self.ttreducer,
                   reducer=self.ttreducer) ]


if __name__ == '__main__':
    salaryavg.run()

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

cols = 'Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay'.split(',')

class salarymax(MRJob):

    def mapper(self, _, line):
        # Convert each line into a dictionary
        row = dict(zip(cols, [ a.strip() for a in csv.reader([line]).next()]))

        # Yield the salary
        yield 'salary', (float(row['AnnualSalary'][1:]), line)
        
        # Yield the gross pay
        try:
            yield 'gross', (float(row['GrossPay'][1:]), line)
        except ValueError:
            self.increment_counter('warn', 'missing gross', 1)

    def reducer(self, key, values):
        topten = []

        # For 'salary' and 'gross' compute the top 10
        for p in values:
            topten.append(p)
            topten.sort()
            topten = topten[-10:]

        for p in topten:
            yield key, p

    combiner = reducer

if __name__ == '__main__':
    salarymax.run()

In [None]:
from mrjob.job import MRJob

class MRWordCount(MRJob):

   def mapper(self, _, line):
      for word in line.split():
         yield(word, 1)

   def reducer(self, word, counts):
      yield(word, sum(counts))

if __name__ == '__main__':
   MRWordCount.run()

## 4.2  Calculate Summary Stats <a name="4.2"></a> 
[Back to Table of Contents](#TOC)
    

<LI> Calculate Average Salary
<LI> Calculate Mean and Variance/Standard deviation


See  http://www.statisticslectures.com/topics/variancesample/
<p>
[Variance](https://www.dropbox.com/s/0k8zal1nncdygji/Screenshot%202016-06-26%2004.17.15.png?dl=0)

In [86]:
mkdir Combiner

In [288]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "Variance_STDEV.png")

# image can be downloaded from here:  
#       https://www.dropbox.com/s/al3uu3tjqs17ozz/Variance_STDEV.png?dl=0

In [289]:
%%writefile Combiner/example.txt
{"id":1, "kind":"Truc",   "value": 4}
{"id":2, "kind":"Machin", "value": 5}
{"id":3, "kind":"Machin", "value": 15}
{"id":4, "kind":"Chose",  "value": 3}
{"id":5, "kind":"Truc",   "value": 20}
{"id":6, "kind":"Chose",  "value": 3}
{"id":7, "kind":"Truc",   "value": 6}
{"id":8, "kind":"Truc",   "value": 4000}

Overwriting Combiner/example.txt


In [114]:
%%writefile Combiner/mrMeanVar.py
from mrjob.job import MRJob
from math import sqrt   
import json

# given 4,5,15,3,20, 3,6,4000
# Mean = 507, SD = 1411
# Calculate the mean and standard deviation
#
class mrMeanVar(MRJob):
    DEFAULT_PROTOCOL = 'json'  #split records in key value pairs using TAB

    def mapper(self, key, line):
        lineDict = json.loads(line)
        val = lineDict["value"]
        yield 1,(val, val*val)
        
    

    def reducer(self, key, vals):
        N = 0.0
        sum = 0.0
        sumsq = 0.0
        for val, valSqd in vals:
            N += 1
            sum += val
            sumsq += valSqd
        mean = sum/N
        sd = sqrt((sumsq - sum*sum/N)/(N-1))
        results = [mean,sd]
        yield 1,results
 
if __name__ == '__main__':
    mrMeanVar.run()

Overwriting Combiner/mrMeanVar.py


In [115]:
!python Combiner/mrMeanVar.py --jobconf mapred.reduce.tasks=1 Combiner/example.txt

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar.jshanahan.20160626.113155.399710
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar.jshanahan.20160626.113155.399710/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar.jshanahan.20160626.113155.399710/step-0-mapper-sorted
> sort /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar.jshanahan.20160626.113155.399710/step-0-mapper_part-00000
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar.jshanahan.20160626.113155.399710/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar.jshanahan.20160626.113155.399710/step-0-reducer_part-00000 -> /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/mrMeanVar

## 5.  Order inversion pattern in MRJob <a name="5"></a> 
[Back to Table of Contents](#TOC)
    

For more background on this example see:  Chapter 2 in this book: "Hadoop with MRJob"  

<LI> https://www.dropbox.com/s/jd3z2s216p9kc1z/hadoop-with-python-MRJOB.pdf?dl=0
<LI> Source code: https://www.dropbox.com/sh/j8oettuxbgztk0p/AAAwq9PpEeByecDmaSNslnBPa?dl=0
### Order inversion pattern in MRJob

#### SCENARIO
I am trying to yield the probability each key,value pair generated from mapper has.

So, lets say mapper yields:

a, (r, 5)
a, (e, 6)
a, (w, 7)
I need to add 5+6+7 = 18 and then find probabilities 5/18, 6/18, 7/18

so the final output from the reducer would look like:

a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]
so far, I can only get the reducer to sum all integers from the value. How can I make it to go back and divide each instance by the total sum?

##### References
<LI>http://stackoverflow.com/questions/15051137/mrjob-can-a-reducer-perform-2-operations


What you are doing above should work as well, but this is assuming that all of the data for a single key will fit in memory. If it does, then at Reducer you can hold all values in memory and then compute your total to then calculate the marginal for each key-value pair. This is commonly known as the "stripes" approach.

However, most of the times this might now be true and the data might not fit in memory. In this case you will have to find a way to send values to compute your total before the actual key-value pair so that when they can then be used to compute the marginal and emit the value right away.

This is a candidate for the "order of inversion" design pattern. Its useful when you need to calculate relative frequencies. For Hadoop, The basic idea is at the Mapper's end you emit 2 key-value pairs for each intermediate data where one of the key-value pair  but will have the same common key for all values. This will be used to calculate the total. For MRJob, this is greatly simplified by using the 



In [None]:
#### Example
For a, (r, 5) :
---------------
emit (a), r, 5
emit (a), *, 5


For a, (e, 6) :
---------------
emit (a), e, 6
emit (a), *, 6


For a, (w, 7) :
---------------
emit (a), w, 7
emit (a), *, 7

Once this is done, you need a partitioner that will partition each of the intermediate key-value pair using only the first value in the key. In the example above using "a". 

You will also need a key sort order that always places the key having * in the second part of the key above all.

This way all intermediate keys have "a" in the first part of the key will end up in the same reducer. Also, they will sorted in a fashion as shown below -

emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7

At the reducer as you iterate through the key-value pairs, you will have to simply accumulate the values from the keys if they have a * in the second part of the key. You can then use the accumulated value to calculate your marginal for all the other key-value pairs.

In [None]:
total = 0
for(value : values){
    if (key.second == *)
        total += value
    else
        emit (key.first , key.second, value, value/total)
}


This design pattern is commonly known as Order of inversion that uses the pairs approach. For more info on this and other design patterns I would suggest reading the chapter on MapReduce design patterns in this book - http://lintool.github.com/MapReduceAlgorithms/. It very well explained with examples.

In [244]:
%%writefile MrRelativeProbs.py

"""
Calculate the Pr(term |class)
use the order inversion pattern (which uses secondary sort)

"""
from mrjob.job import MRJob

class MrRelativeProbs(MRJob):
    DEFAULT_PROTOCOL = 'json'  #split records in key value pairs using TAB
    
    # Performs secondary sort on the word with in the class
    # as a result the Reducer receives records sort by class, and then by word (
    # remember we have a special word **Total
    SORT_VALUES = True

    def __init__(self, *args, **kwargs):
        super(MrRelativeProbs, self).__init__(*args, **kwargs)
        self.modelStats = {}

 
    def mapper(self, key, line):
        docID, docClass, text = line.split("\t",2)   
        words = text.split()
        for w in words:
            yield docClass, (w, 1)
        yield docClass, ("**Total", len(words)) # keep a tally of the total
        
    # TODO: Should add a combiner
    
    def reducer(self, classI, wordFreqs):
        #print "wordFreqs", wordFreqs
        #for w, freq in wordFreqs:   NOTE that this for loop exhausted the iterator; so can not iterate again later
        #      print "wordFreqs", classI, w, freq
        total = 0   #start of a new class
        currentWord=""
        currentWordFreq=0
        for w, freq  in wordFreqs:
            if (w == "**Total") : 
                total += freq
            elif currentWord == w:
                currentWordFreq += freq
            else: # a new word
                if currentWord != "**Total" and currentWord != "":
                    yield(classI, [currentWord, currentWordFreq, float(currentWordFreq)/total])
                currentWord=w
                currentWordFreq=freq
        #dont forget the last word!
        if currentWord !="" and currentWord != "**Total" :
            yield(classI, (currentWord, currentWordFreq, float(currentWordFreq)/total))
 

# The if __name__ == "__main__": 
# ... trick exists in Python so that our Python files 
# can act as either reusable modules, or as standalone programs.
if __name__ == '__main__':
    MrRelativeProbs.run()

Overwriting MrRelativeProbs.py


In [258]:
pwd

u'/Users/jshanahan/Dropbox/Projects/Target-2016-04/Homeworks/HW02'

# Join two tables (using a secondary sort)

The following Performs secondary sort <p>
  SORT_VALUES = True
  

In [117]:
mkdir Join

In [120]:
%%writefile Join/customers.dat
Alice Bob|not bad|US
Sam Sneed|valued|CA
Jon Sneed|valued|CA
Arnold Wesise|not so good|UK
Henry Bob|not bad|US
Yo Yo Ma|not so good|CA
Jon York|valued|CA
Alex Ball|valued|UK
Jim Davis|not so bad|JA

Overwriting Join/customers.dat


In [119]:
%%writefile Join/countries.dat
United States|US
Canada|CA
United Kingdom|UK
Italy|IT

Writing Join/countries.dat


In [122]:
%%writefile Join/MRJoin.py
# Adapted for MrJob from Joe Stein's example at:
# http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

import sys, os, re
from mrjob.job import MRJob

class MRJoin(MRJob):
  
  # Performs secondary sort
  SORT_VALUES = True
  
  def mapper(self, _, line):    
    splits = line.rstrip("\n").split("|")
    
    if len(splits) == 2: # country data
      symbol = 'A' # make country sort before person data
      country2digit = splits[1]
      yield country2digit, [symbol, splits]
    else: # person data
      symbol = 'B'
      country2digit = splits[2]
      yield country2digit, [symbol, splits]
  
  def reducer(self, key, values):
    countries = [] # should come first, as they are sorted on artificia key 'A'
    for value in values:
      if value[0] == 'A':    #if we have multiple records on left/country side
        countries.append(value)
      if value[0] == 'B':
        for country in countries:   #take the crossproduct country X Person
          yield key, country[1:] + value[1:]
      
if __name__ == '__main__':
  MRJoin.run()

Writing Join/MRJoin.py


In [124]:
!python Join/MRJoin.py Join/countries.dat Join/customers.dat

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
ignoring partitioner keyword arg (requires real Hadoop): 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
creating tmp directory /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/MRJoin.jshanahan.20160626.121350.077099
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/MRJoin.jshanahan.20160626.121350.077099/step-0-mapper_part-00000
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/MRJoin.jshanahan.20160626.121350.077099/step-0-mapper_part-00001
Counters from step 1:
  (no counters found)
writing to /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/MRJoin.jshanahan.20160626.121350.077099/step-0-mapper-sorted
> sort /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/MRJoin.jshanahan.20160626.121350.077099/step-0-mapper_part-00000 /var/folders/j4/95k348x940xcz40fkdmgy_n40000gn/T/MRJoin.jshanahan.20160626.121350.077099/step-0-mapper_part-00001
writing to /var/fol

## 6.  Useful  Links for Jupyter Notebooks <a name="0"></a> 
[Back to Table of Contents](#TOC)
 
Some useful links for editing Jupyter notebooks:

http://blog.jupyter.org/2016/01/08/notebook-4-1-release/
    
https://sowingseasons.com/blog/reference/2016/01/jupyter-keyboard-shortcuts/23298516

[Markdown examples ](http://nbviewer.jupyter.org/github/ipython/ipython/blob/1.x/examples/notebooks/Part%204%20-%20Markdown%20Cells.ipynb)

~~~
Command Mode (press ESC to enable)
Enter : enter edit mode
Shift-Enter : run cell, select below
Ctrl-Enter : run cell
Alt-Enter : run cell, insert below
Y : to code
M : to markdown
R : to raw
1 : to heading 1
2 : to heading 2
3 : to heading 3
4 : to heading 4
5 : to heading 5
6 : to heading 6
Up : select cell above
K : select cell above
Down : select cell below 
SHIFT Down:  select one *or more cells* (to move up or down within a notebook) 
J : select cell below
A : insert cell above
B : insert cell below
X : cut selected cell
C : copy selected cell
Shift-V : paste cell above
V : paste cell below
Z : undo last cell deletion
D,D : delete selected cell
Shift-M : merge cell below
S : Save and Checkpoint
Ctrl-S : Save and Checkpoint
L : toggle line numbers
O : toggle output
Shift-O : toggle output scrolling
Esc : close pager
Q : close pager
H : show keyboard shortcut help dialog
I,I : interrupt kernel
0,0 : restart kernel
Space : scroll down
Shift-Space : scroll up
Shift : ignore
Edit Mode (press Enter to enable)
Tab : code completion or indent
Shift-Tab : tooltip
Ctrl-] : indent
Ctrl-[ : dedent
Ctrl-A : select all
Ctrl-Z : undo
Ctrl-Shift-Z : redo
Ctrl-Y : redo
Ctrl-Home : go to cell start
Ctrl-Up : go to cell start
Ctrl-End : go to cell end
Ctrl-Down : go to cell end
Ctrl-Left : go one word left
Ctrl-Right : go one word right
Ctrl-Backspace : delete word before
Ctrl-Delete : delete word after
Esc : command mode
Ctrl-M : command mode
Shift-Enter : run cell, select below
Ctrl-Enter : run cell
Alt-Enter : run cell, insert below
Ctrl-Shift-Subtract : split cell
Ctrl-Shift-- : split cell
Ctrl-S : Save and Checkpoint
Up : move cursor up or previous cell
Down : move cursor down or next cell
Shift : ignore
~~~

## -1.  End of Notebook <a name="0"></a> 
[Back to Table of Contents](#TOC)