# Introduction to MapReduce and mrjob
In this exercise we will be analyzing a social graph of friends with ```mrjob```. Many of the largest datasets on the web today are graphical in nature (web links, social networks, road/geographic networks, etc.) and often you need to use big data technologies like Hadoop if you wish to analyze them at scale. Today we will perform some basic analysis to find all of a users friends, and from this we will perform [triadic closure](http://en.wikipedia.org/wiki/Triadic_closure) to recommend new friends.

In [None]:
#count of each word across all the documents
from mrjob.job import MRJob
from string import punctuation
class MRWordFreqCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield (word.strip(punctuation).lower(), 1)

    def reducer(self, word, counts):
        yield (word, sum(counts))
if __name__ == '__main__':
    MRWordFreqCount.run()


In [None]:
#sample output:
"norton"	1
"not"	72
"note"	10
"noteworthy"	1
"nothing"	3
"notice"	2
"notreached"	1
"novel"	1
"november"	1
"novices"	1
"now"	8
"null"	14
"num_args"	8
"number"	6
"numbers"	3
"nutrasweet"	5
"nutrition"	1
"nutshell"	1
"ny"	2
"nyc"	1
"o"	4
"o&>o"	1
"o'reilly"	5
"obese"	1
"obesity"	1
"object"	9
"object-code"	1
"oboe.cis.ohio-state.edu"	1
"obscured"	1
"observation"	1


In [None]:
#to get the word count for each topic
# from mrjob.job import MRJob
# from string import punctuation
import os
import json
class MRWordFreqCount(MRJob):
    def mapper(self, _, filename):
        data=[]
        # with open(filename) as f:
        #     for line in f:
        data.append(json.loads(filename))
        #f=json.loads(filename)
    #your_dict[k] = v.lstrip()
        for topic in data:
            for k, v in topic.items():
                yield(topic[k].strip(punctuation).lower(), 1)
                #yield (topic.strip(punctuation).lower(), 1)
    def reducer(self, topic, counts):
        yield (topic, sum(counts))
if __name__ == '__main__':
    MRWordFreqCount.run()

In [None]:
#sample output
"mini_20_newsgroups\/sci.med\/58892"	23
"mini_20_newsgroups\/sci.med\/58893"	17
"mini_20_newsgroups\/sci.med\/58894"	21
"mini_20_newsgroups\/sci.med\/58895"	25
"mini_20_newsgroups\/sci.med\/58896"	52
"mini_20_newsgroups\/sci.med\/58897"	121
"mini_20_newsgroups\/sci.med\/58898"	31
"mini_20_newsgroups\/sci.med\/58899"	28
"mini_20_newsgroups\/comp.windows.x\/66398"	45
"mini_20_newsgroups\/comp.windows.x\/66399"	29
"mini_20_newsgroups\/rec.motorcycles\/103117"	64
"mini_20_newsgroups\/rec.motorcycles\/103118"	28
"mini_20_newsgroups\/rec.motorcycles\/103119"	24
"mini_20_newsgroups\/sci.med\/58890"	58
"mini_20_newsgroups\/sci.med\/58891"	69
"mini_20_newsgroups\/.ds_store"	23
"mini_20_newsgroups\/comp.windows.x\/66322"	1626

In [None]:
# # #One of MapReduce's built-in objects is the Counter object, which can be used to
# to keep the counts of a **fixed** number of categories that you know **ahead**
# of time. They can be particularly useful in debugging. Here's a version of
# the simple word count script where we count the number of words that come from
# each of the `comp`, `sci`, and `rec` groupings that we are passing
# in. Note that here we don't have to do a reduce step, which can save quite a
# bit of time!
import os
from mrjob.job import MRJob
from string import punctuation
class TotalWordCountByTopic(MRJob):

    def mapper(self, _, line):
        filename = os.environ['map_input_file']
        filename_parts = filename.split('/')
        grouping_part = filename_parts[1].split('.')

        for word in line.split():
            self.increment_counter(grouping_part[0], 'word_count', 1)

if __name__ == '__main__':
    TotalWordCountByTopic.run()



In [None]:
#sample output
Counters: 3
            comp
                word_count=9913
            rec
                word_count=787
            sci
                word_count=2972

In [None]:
# '''The classic MapReduce job: count the frequency of words.'''
# One of the biggest bottlenecks in a MapReduce job is the data transfer from the map step to the reduce step. To help speed up the data transfer process, MapReduce has a built-in Combiner, which is effectively a Reducer that the map step uses to reduce the data locally (i.e. on that Mapper) before it gets passsed on to the reduce step. This saves some time by lessoning the amount of data that has to be transferred from the map step to the reduce step. Here's a version of the simple word count script that uses a Combiner to get a word count locally on each Mapper before the data is transferred to the Reducer. 

from mrjob.job import MRJob
from string import punctuation
class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            yield (word.strip(punctuation).lower(), 1)
    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))
if __name__ == '__main__':
    MRWordFreqCount.run()

In [None]:
# #Write a MapReduce job which returns a list of every user's friends.
# from mrjob.job import MRJob
# from string import punctuation
class MRWordFreqCount(MRJob):
    def mapper(self, _, line):
        q1, q2 = line.split(' ')
        yield(q1, q2)
        yield(q2, q1)
    def reducer(self, n, counts):
        #result=[]
        yield (n, list(counts))
if __name__ == '__main__':
    MRWordFreqCount.run()



In [None]:
#sample output
    0   [1, 2, 5]
    1   [0, 3, 4]
    2   [0, 3, 4]
    3   [1, 2, 4]
    4   [1, 2, 3, 5]
    5   [0, 4]

In [None]:
# Get Friend Suggestions
# We'd like to give a friend suggestion for each user (if we have a suggestion to give).
# Write an additional mapper and reducer that will choose for each user the other user which has the most friends in common with them.
from mrjob.job import MRJob
from mrjob.step import MRStep
from itertools import combinations
class MRWordFreqCount(MRJob):
    def mapper1(self, _, line):
        q1, q2 = line.split(' ')
        yield(q1, q2)
        yield(q2, q1)
    def reducer1(self, n, counts):
        #result=[]
        yield (n, list(counts))
    def mapper_second(self, users, friends):
        # value=self.increment_counter('num', 'val', 1)
        for a, b in combinations(friends, 2):
            yield (a,b), 1
        for f in friends:
            yield (users, f), 0
        # v = list(counts)
        # self.increment_counter('n', 'v', 1)
        # yield _, value
    def reducer_second(self, com, flag):
        c=0
        for fl in flag:
            if fl==0:
                return
            c += fl
        #result=[]
        yield (com, c)
    def mapper_third(self, com, c):
        u1, u2=com
        yield u1, (u2, c)
        yield u2, (u1, c)
    def reducer_third(self, ui, co):
        yield ui, max(co, key=lambda x: x[1])[0]
    def steps(self):
        return [MRStep(mapper=self.mapper1,reducer=self.reducer1), MRStep(mapper=self.mapper_second,reducer=self.reducer_second), MRStep(mapper=self.mapper_third,reducer=self.reducer_third)]
if __name__ == '__main__':
    MRWordFreqCount.run()



In [None]:
#sample output
    0   4
    1   2
    2   1
    3   0
    4   0
    5   1