In [15]:
## Job1: simple wordcount on term-document tuples to compute tf

### Map
# Input: (docname, contents):
# Output: ((docname,term), 1)

### Reduce
# Input: ((docname,term), 1)
# Output: ((term,docname), (N,n))

###########################################  mapper1.py

import os
import sys

for line in sys.stdin:
    
    line = line.strip()
    terms = line.split(" ")
    
    path = os.environ['mapreduce_map_input_file'].split('/')
    docname = path[-1]
    
    for term in terms:
        term = term.strip('''!()-[]{};:'"\,<>./?@#$%^&*_~''').lower()
        print('%s\t%s' % (term + '_' + docname, 1))
        
        
###########################################  reducer1.py

import sys

current_pair = None
current_count = 0
pair = None

for line in sys.stdin:
    
    line = line.strip()
    pair, count = line.split('\t', 1)
 
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
 
    if current_pair == pair:
        current_count += count
    else:
        if current_pair:
            # write result to STDOUT
            print ('%s\t%s' % (current_pair, current_count))
        current_count = count
        current_pair = pair

if current_pair == pair:
        print ('%s\t%s' % (current_pair, current_count))
    

In [29]:
## Job2: append document frequency d to term_doc pairs

### Map
# Input:  ((term,docname), (N,n))
# Output: (term, (docname,N,n,1))

### Reduce
# Input: (term, (docname,N,n,1))
# Output: ((term,docname), (N,n,d))



###########################################  mapper2.py

import sys

for line in sys.stdin:
    
    term,rest = line.split('_')
    docname,n = rest.split('\t')
    
    try:
        n = int(n)
    except ValueError:
        continue
    
    print('%s\t%s' % (term, docname + '_' + n + '_' + 1))
        
        
###########################################  reducer2.py

import sys

current_term = None
doc_list = []
n_list=[]
current_count = 0
term = None
doc = None

for line in sys.stdin:

    term,rest = line.split('\t',1)
    doc,n,count = rest.split('_', 2)
 
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
 
    if current_term == term:
        doc_list.append(doc)
        n_list.append(n)
        current_count += count
    else:
        if current_term:
            for i,document in enumerate(doc_list):
                print ('%s\t%s' % (current_term + '_' + document, n_list[i] + '_' + str(current_count)))
            doc_list = []   
            n_list = []
            
        current_count = count
        current_term = term
        doc_list.append(doc)
        n_list.append(n)
        
        

if current_term == term:
    for i,document in enumerate(doc_list):
        print ('%s\t%s' % (current_term + '_' + document, n_list[i] + '_' + str(current_count)))

In [30]:
## Job3: compute tf-idfs

### Map
# Input:  ((term,docname), (N,n,d))
# Output: ((term,docname), tfidf)

### Reduce ----pass
# Input:
# Output: 

###########################################  mapper3.py

import sys
import math

for line in sys.stdin:
    
    pair,rest = line.split('/t',1)
    n,d = rest.split('_',1)
    
    try:
        n = int(n)
        d = int (d)
    except ValueError:
        continue
        
        
    ## to calculate these with normalizing terms we need to compute:
    # 1. the total of words in each documents
    # 2. the total number of documents in the corpus
    # look into Hadoop global variables/ counters
    
    tf = n  
    idf = math.log(1/(1+d))
    tfidf = tf * idf
    
    
    print('%s\t%s' % (pair, tfidf))

In [None]:
minutes Jan 31st meeting:
    - computation of Nj and D
    - 