# Map Reduce Task

## Reading-in Preprocessed Data and Packages 
 
Install mrjob for it to work with the database.  


In [1]:
import prep 
from mrjob.job import MRJob
from mrjob.step import MRStep

Reviewer: A2VNYWOPJ13AFP, ASIN: 0981850006, Tokens: {'time', 'directions', 'cuisine', 'insight', 'yum', 'love', 'broadening', 'raichlen', 'page', 'food', 'provided', 'recipes', 'interpret', 'gift', 'simple', 'barbecue', 'husband', 'calls', 'kinds', 'open', 'produced', 'things', 'horizons', 'culture', 'make', 'making', 'trail', 'easy'}, Category: Patio_Lawn_and_Garde


In [2]:
data = prep.cleaned_reviews
terms = prep.all_terms
categories = prep.all_categories
data[0] # overview 

{'reviewerID': 'A2VNYWOPJ13AFP',
 'asin': '0981850006',
 'tokens': {'barbecue',
  'broadening',
  'calls',
  'cuisine',
  'culture',
  'directions',
  'easy',
  'food',
  'gift',
  'horizons',
  'husband',
  'insight',
  'interpret',
  'kinds',
  'love',
  'make',
  'making',
  'open',
  'page',
  'produced',
  'provided',
  'raichlen',
  'recipes',
  'simple',
  'things',
  'time',
  'trail',
  'yum'},
 'category': 'Patio_Lawn_and_Garde'}

## Calculate chi-square values

**Recap of the variable names from the lecture:** 
- t ... term  
- c ... category  
- A ... number of documents in c which contain t  
- B ... number of documents not in c which contain t  
- C ... number of documents in c without t  
- D ... number of documents not in c without t  
- N ... total number of retrieved documents  (can be ommited for ranking)

As we can see the definition of the chi-square test only counts the number of documents in which c or t are included but not the instances that t is contained in each single document. For this reason, we will edit the preprocessing to exclude multiple instances of each token in each review and also keep a set of all tokens and categories for later use. By doing this in the preprocessing we can reduce computation time. 

It may be very computationally intensive which is why we will try a multiprocessor approach. We probed the cluster of available CPU-cores in the current node by using the command "nproc". Furthermore, we used "yarn node -list" to list all nodes, "yarn node -status <node-id>" to check the number of virtual cores of different nodes which makes us believe that there are about **16 virtual cores** that each user can use. 

In [9]:
def getVariables(data, t, c): # getting variables for each term and category 
    A = B = C = D = 0
    for lines in data: 
        if(c in lines["category"]): 
            if(t in lines["tokens"]): 
                A += 1 
            else:
                C += 1
        else: 
            if(t in lines["tokens"]): 
                B += 1 
            else:
                D += 1 
    
    return A,B,C,D


from collections import defaultdict # useful for non-existing keys 
from heapq import nlargest 
from multiprocessing import Pool

def chiSquare(data, terms, categories): 
    N = len(data) 

    # Calculate chi-square values for all unigram terms for each category: 
    ChiSquare = defaultdict(list)
    for t in terms: 
        for c in categories: 
            A,B,C,D = getVariables(data, t, c)
            ChiSquare[c].append(((N*(A*D-B*C)**2)/((A+B)*(A+C)*(B+D)*(C+D)), t))
    
    # ChiSquare_top75 = {
    #     c: [t for value, t in nlargest(75, token_vals)]
    #     for c, token_vals in ChiSquare.items()
    # }
    return ChiSquare #ChiSquare_top75

def parallel_chiSquare(data, terms, categories, nprcs = 12): 
    
    # Separate Terms: 
    terms = list(terms)
    k,r = divmod(len(terms), nprcs) # k group size and r rest 
    inputs = [(data, terms[i * k + min(i, r):(i + 1) * k + min(i + 1, r)], categories) for i in range(nprcs)]

    pool = Pool(processes= nprcs)
    results = pool.starmap(chiSquare, inputs)
    pool.close()
    pool.join()


    ChiSquare_top75 = {
        c: [t for value, t in nlargest(75, token_vals)]
        for c, token_vals in results.items()
    }

    return ChiSquare_top75


In [4]:
import numpy as np 
test_array = np.array(data[0]["tokens"])
for lines in data[1:10]: 
    test_array = np.vstack([test_array, lines["tokens"]])

test_array

array([[{'time', 'directions', 'cuisine', 'insight', 'yum', 'love', 'broadening', 'raichlen', 'page', 'food', 'provided', 'recipes', 'interpret', 'gift', 'simple', 'barbecue', 'husband', 'calls', 'kinds', 'open', 'produced', 'things', 'horizons', 'culture', 'make', 'making', 'trail', 'easy'}],
       [{'time', 'feels', 'bumps', 'nice', 'tires', 'experimentation', 'cable', 'edgeguard', 'precise', 'solid', 'handling', 'material', 'give', 'side', 'metal', 'arm', 'products', 'control', 'farther', 'distribution', 'good', 'great', 'maneuverability', 'pneumatic', 'flings', 'crappy', 'true', 'spreader', 'long', 'settings', 'left'}],
       [{'falls', 'avoid', 'work', 'junction', 'leaks', 'sprinkler', 'bit', 'hose', 'pops', 'heads', 'fairly', 'poorly', 'designed', 'gilmour', 'spike', 'base', 'badly', 'metal', 'sprinklers', 'made', 'attachments', 'tighten', 'pointed', 'fix', 'useless', 'previous', 'plastic', 'wears', 'reviewer'}],
       [{'time', 'waterin', 'corridors', 'work', 'large', 'system

In [None]:
test_out = parallel_chiSquare(data, terms, categories)
len(test_out)

In [11]:
test_set = {}
test_list = ["b", "a", "c"]
test_list2 = [3, 2, 1]

for i in range(3): 
    test_set.update({3-i:(test_list[i], test_list2[i])})

for key, (t, v) in test_set.items(): 
    print(key, t, v, "\n", end ="")

3 b 3 
2 a 2 
1 c 1 


In [21]:
from collections import defaultdict 
test_set = defaultdict(list)
test_list = ["b", "a", "c"]
test_list2 = [3, 2, 1]

for i in range(3): 
    for j in range(3): 
        test_set[i].append((test_list[j], test_list2[j]))

for key, values in test_set.items(): 
    print(key) 
    [print(t, ":" ,v) for t,v in values]

0
b : 3
a : 2
c : 1
1
b : 3
a : 2
c : 1
2
b : 3
a : 2
c : 1


In [6]:
import multiprocessing

multiprocessing.cpu_count()

16