This is an experiment with the PageRank MR [implementation](http://michaelnielsen.org/blog/using-mapreduce-to-compute-pagerank/) by Michael Nielson.

The article Michael Nielson wrote contains a map reduce algorithm, and I'm putting here so I can get the original code working as designed. The goal of this exercise is to understand the implmentation in detail and then adapt it. The first adaptation step is to use MrJob instead of the `map_reduce` function here, and the second step is to adapt the algorithm to work with the toy example as part of HW9.1.

-----------------

## Part 1: The Original Code

In [4]:
%%writefile map_reduce.py
# map_reduce.py
# Defines a single function, map_reduce, which takes an input
# dictionary i and applies the user-defined function mapper to each
# (input_key,input_value) pair, producing a list of intermediate 
# keys and intermediate values.  Repeated intermediate keys then 
# have their values grouped into a list, and the user-defined 
# function reducer is applied to the intermediate key and list of 
# intermediate values.  The results are returned as a list.

import itertools

def map_reduce(i,mapper,reducer):
    intermediate = []
    for (key,value) in i.items():
        intermediate.extend(mapper(key,value))
    groups = {}
    for key, group in itertools.groupby(sorted(intermediate), lambda x: x[0]):
        groups[key] = list([y for x, y in group])
    return [reducer(intermediate_key,groups[intermediate_key]) for intermediate_key in groups] 

Writing map_reduce.py


In [34]:
# pagerank_mr.py
#
# Computes PageRank, using a simple MapReduce library.
#
# MapReduce is used in two separate ways: (1) to compute
# the inner product between the vector of dangling pages
# (i.e., pages with no outbound links) and the current
# estimated PageRank vector; and (2) to actually carry
# out the update of the estimated PageRank vector.
#
# For a web of one million webpages the program consumes
# about one gig of RAM, and takes an hour or so to run,
# on a (slow) laptop with 3 gig of RAM, running Vista and
# Python 2.5.

import map_reduce
import numpy.random
import random

def paretosample(n,power=2.0):
    # Returns a sample from a truncated Pareto distribution
    # with probability mass function p(l) proportional to
    # 1/l^power.  The distribution is truncated at l = n.

    m = n+1
    while m > n: m = numpy.random.zipf(power)
    return m

def initialize(n,power):
    # Returns a Python dictionary representing a web
    # with n pages, and where each page k is linked to by
    # L_k random other pages.  The L_k are independent and
    # identically distributed random variables with a
    # shifted and truncated Pareto probability mass function
    # p(l) proportional to 1/(l+1)^power.

    # The representation used is a Python dictionary with
    # keys 0 through n-1 representing the different pages.
    # i[j][0] is the estimated PageRank, initially set at 1/n,
    # i[j][1] the number of outlinks, and i[j][2] a list of
    # the outlinks.

    # This dictionary is used to supply (key,value) pairs to
    # both mapper tasks defined below.

    # initialize the dictionary
    i = {} 
    for j in xrange(n): i[j] = [1.0/n,0,[]]

    # For each page, generate inlinks according to the Pareto
    # distribution. Note that this is somewhat tedious, because
    # the Pareto distribution governs inlinks, NOT outlinks,
    # which is what our representation is adapted to represent.
    # A smarter representation would give easy
    # access to both, while remaining memory efficient.
    for k in xrange(n):
        lk = paretosample(n+1,power)-1
        values = random.sample(xrange(n),lk)
        for j in values:
            i[j][1] += 1 # increment the outlink count for page j
            i[j][2].append(k) # insert the link from j to k
    return i

def ip_mapper(input_key,input_value):
    # The mapper used to compute the inner product between
    # the vector of dangling pages and the current estimated
    # PageRank.  The input is a key describing a webpage, and
    # the corresponding data, including the estimated pagerank.
    # The mapper returns [(1,pagerank)] if the page is dangling,
    # and otherwise returns nothing.

    if input_value[1] == 0: return [(1,input_value[0])]
    else: return []

def ip_reducer(input_key,input_value_list):
    # The reducer used to compute the inner product.  Simply
    # sums the pageranks listed in the input value list, which
    # are all the pageranks for dangling pages.

    return sum(input_value_list)

def pr_mapper(input_key,input_value):
    # The mapper used to update the PageRank estimate.  Takes
    # as input a key for a webpage, and as a value the corresponding
    # data, as described in the function initialize.  It returns a
    # list with all outlinked pages as keys, and corresponding values
    # just the PageRank of the origin page, divided by the total
    # number of outlinks from the origin page.  Also appended to
    # that list is a pair with key the origin page, and value 0.
    # This is done to ensure that every single page ends up with at
    # least one corresponding (intermediate_key,intermediate_value)
    # pair output from a mapper.
  
    return [(input_key,0.0)]+[(outlink,input_value[0]/input_value[1])
        for outlink in input_value[2]]

def pr_reducer_inter(intermediate_key,intermediate_value_list,
                     s,ip,n):
    # This is a helper function used to define the reducer used
    # to update the PageRank estimate.  Note that the helper differs
    # from a standard reducer in having some additional inputs:
    # s (the PageRank parameter), ip (the value of the inner product
    # between the dangling pages vector and the estimated PageRank),
    # and n, the number of pages.  Other than that the code is
    # self-explanatory.
  
    return (intermediate_key,
        s*sum(intermediate_value_list)+s*ip/n+(1.0-s)/n)

def pagerank(i,s=0.85,tolerance=0.00001):
    # Returns the PageRank vector for the web described by i,
    # using parameter s.  The criterion for convergence is that
    # we stop when M^(j+1)P-M^jP has length less than tolerance,
    # in l1 norm.
  
    n = len(i)
    iteration = 1
    change = 2 # initial estimate of error
    while change > tolerance:
        print "Iteration: "+str(iteration)
        # Run the MapReduce job used to compute the inner product
        # between the vector of dangling pages and the estimated
        # PageRank.
        ip_list = map_reduce.map_reduce(i,ip_mapper,ip_reducer)

        # the if-else clause is needed in case there are no dangling
        # pages, in which case MapReduce returns ip_list as the empty
        # list.  Otherwise, set ip equal to the first (and only)
        # member of the list returned by MapReduce.
        if ip_list == []: ip = 0
        else: ip = ip_list[0]

        # Dynamically define the reducer used to update the PageRank
        # vector, using the current values for s, ip, and n.
        pr_reducer = lambda x,y: pr_reducer_inter(x,y,s,ip,n)

        # Run the MapReduce job used to update the PageRank vector.
        new_i = map_reduce.map_reduce(i,pr_mapper,pr_reducer)

        # Compute the new estimate of error.
        change = sum([abs(new_i[j][1]-i[j][0]) for j in xrange(n)])
        print "Change in l1 norm: {0:1.12f}".format(change)

        # Update the estimate PageRank vector.
        for j in xrange(n): i[j][0] = new_i[j][1]
        iteration += 1
    return i

n = 1000 # works up to about 1000000 pages
i = initialize(n,2.0)
new_i = pagerank(i,0.85,0.0001)

Iteration: 1
Change in l1 norm: 1.159352642857
Iteration: 2
Change in l1 norm: 0.407339540567
Iteration: 3
Change in l1 norm: 0.164121655282
Iteration: 4
Change in l1 norm: 0.079544297575
Iteration: 5
Change in l1 norm: 0.035531329063
Iteration: 6
Change in l1 norm: 0.013737477305
Iteration: 7
Change in l1 norm: 0.006959669613
Iteration: 8
Change in l1 norm: 0.003339450145
Iteration: 9
Change in l1 norm: 0.001642446968
Iteration: 10
Change in l1 norm: 0.000736898241
Iteration: 11
Change in l1 norm: 0.000382751385
Iteration: 12
Change in l1 norm: 0.000189308194
Iteration: 13
Change in l1 norm: 0.000094928583


## Part 2: Adapting to MrJob

Let's start by adapting the current algorithm to MrJob. One of the first things we have to do is execute the commands that make Jupyter notebooks reload the Python modules each time we execure to pick up changes in code

In [7]:
# Jupyter requires this for MRJob to reload classes properly
%load_ext autoreload
%autoreload 2

In [42]:
%%writefile initweb.py
import numpy.random
import random

class initweb():
    @staticmethod
    def paretosample(n,power=2.0):
        # Returns a sample from a truncated Pareto distribution
        # with probability mass function p(l) proportional to
        # 1/l^power.  The distribution is truncated at l = n.

        m = n+1
        while m > n: m = numpy.random.zipf(power)
        return m

    @staticmethod
    def initialize(n,power):
        # Returns a Python dictionary representing a web
        # with n pages, and where each page k is linked to by
        # L_k random other pages.  The L_k are independent and
        # identically distributed random variables with a
        # shifted and truncated Pareto probability mass function
        # p(l) proportional to 1/(l+1)^power.

        # The representation used is a Python dictionary with
        # keys 0 through n-1 representing the different pages.
        # i[j][0] is the estimated PageRank, initially set at 1/n,
        # i[j][1] the number of outlinks, and i[j][2] a list of
        # the outlinks.

        # This dictionary is used to supply (key,value) pairs to
        # both mapper tasks defined below.

        # initialize the dictionary
        i = {} 
        for j in xrange(n): i[j] = [1.0/n,0,[]]

        # For each page, generate inlinks according to the Pareto
        # distribution. Note that this is somewhat tedious, because
        # the Pareto distribution governs inlinks, NOT outlinks,
        # which is what our representation is adapted to represent.
        # A smarter representation would give easy
        # access to both, while remaining memory efficient.
        for k in xrange(n):
            lk = initweb.paretosample(n+1,power)-1
            values = random.sample(xrange(n),lk)
            for j in values:
                i[j][1] += 1 # increment the outlink count for page j
                i[j][2].append(k) # insert the link from j to k
        return i
    
    @staticmethod
    def initialize_file(n, power, filename):
        # Calls the initialize function and then persists the dictionary
        # as an adjacency list to the file specified by filename
        dictionary = initweb.initialize(n, power)
        with open(filename, 'w') as outfile:
            for key in dictionary:
                outfile.write('{0}\t{1}\n'.format(key,dictionary[key]))


Overwriting initweb.py


In [100]:
%%writefile MrJobDangler.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

# We'll put the core map and reduce steps in an extension
# of the MRJob class as we would for any implementation
class MrJobDangler(MRJob):

    def configure_options(self):
        super(MrJobDangler, self).configure_options()    
    
    def ip_mapper(self, _, line):
        # The mapper used to compute the inner product between
        # the vector of dangling pages and the current estimated
        # PageRank.  The input is a key describing a webpage, and
        # the corresponding data, including the estimated pagerank.
        # The mapper returns [(1,pagerank)] if the page is dangling,
        # and otherwise returns nothing.
        #
        # Modification for MrJob is to read from the input splits
        # a line at a time to format as the dictionary entry
        
        page, info = re.split('\t',line.strip())
        key = page.strip('"')
        input_value = eval(info)

        # first we have to emit the graph as is for the second stage
        yield key, input_value
        
        # then we emit the dangling page information
        if input_value[1] == 0: yield '*',input_value[0]
        else: yield '*',0
            
    def ip_combiner(self, input_key, input_value_list):
        # The combiner is mentioned but not implemented in the article
        # but we can implement it here simply enough.
        # Sum the values for the key and emit the key, sum(values)
        # Pass through anything that doesn't have * as the key
        if input_key =='*': yield input_key, sum(input_value_list)
        else: yield input_key, input_value_list

    def ip_reducer(self, input_key,input_value_list):
        # The reducer used to compute the inner product.  Simply
        # sums the pageranks listed in the input value list, which
        # are all the pageranks for dangling pages.
        
        # Since we're sending this to the second stage map reduce 
        # and we're not intercepting this like the original algorithm
        # we need to emit it in a way that identifies it as the dangling
        # page probability mass. Since we have to broadcast that mass
        # to all the instances of the second stage we will emit it as 
        # a counter by multiplying it by 10^12 and taking the integer
        # portion
        if input_key == '*': 
            dangling_mass = int(1000000000000 * sum(input_value_list))
            self.increment_counter('Page Rank', 'Dangling Mass', amount=dangling_mass)
        else: 
            for value in input_value_list:
                    yield input_key, value
                
        
    def steps(self):
        return [MRStep(mapper=self.ip_mapper,
                       reducer=self.ip_reducer)
                    ]

if __name__ == '__main__':
    MrJobDangler.run() 

Overwriting MrJobDangler.py


In [134]:
%%writefile MrJobRanker.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

# We'll put the core map and reduce steps in an extension
# of the MRJob class as we would for any implementation
class MrJobRanker(MRJob):

    def configure_options(self):
        super(MrJobRanker, self).configure_options()    
        self.add_passthrough_option(
            '--n', dest='n', type='int', default=0, 
            help="Number of nodes")  
        self.add_passthrough_option(
            '--ip', dest='ip', type='float', default=0.85, 
            help="dangling mass") 
        self.add_passthrough_option(
            '--s', dest='s', type='float', default=0.85, 
            help="damping factor") 
            
    def mapper(self, _, line):
        # we're not getting the data directly but from an intermediate
        # input file split, so we need to recover the variables from 
        # each line presented to the mapper
        node, info_list = re.split('\t', line.strip())
        input_key = node.strip('"')
        input_value = eval(info_list)

        yield input_key, input_value

        # next send out the rank distributed across the outlinks
        # make sure the key is a string 
        for outlink in input_value[2]:
            yield str(outlink), input_value[0]/input_value[1]
        
    def reducer(self, intermediate_key, intermediate_value_list):
        
        # Michael used a nice trick to send in a function for the reducer
        # in the map_reduce function that he wrote. We need to pull this 
        # apart just a bit

        # For the n variable we have the parameter as self.options.n
        # For the s variable we have the parameter as self.options.s
        # For the ip variable we have the parameter as self.options.ip
        # but we need to divide the ip by the scalar we used to make it 
        # an integer counter value
        
        info_list = []
        page_rank = 0.0
        ip = float(self.options.ip)/1000000000000.0
        for intermediate_value in intermediate_value_list:
            if isinstance(intermediate_value, list):
                # this is our graph. Save the info list
                info_list = intermediate_value
            else:
                # this is one of the values to accumulate for page rank
                page_rank += intermediate_value
        page_rank = self.options.s*page_rank + \
                    self.options.s*ip/self.options.n + \
                    (1.0-self.options.s)/self.options.n
        yield intermediate_key, [page_rank, len(info_list[2]), info_list[2]]

        
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer)
                    ]

if __name__ == '__main__':
    MrJobRanker.run() 

Overwriting MrJobRanker.py


In [135]:
from MrJobDangler import MrJobDangler
from MrJobRanker import MrJobRanker
from initweb import initweb

def pagerank(infile,outfile, n=1000,s=0.85,tolerance=0.00001):
    # Returns the PageRank vector for the web described by i,
    # using parameter s.  The criterion for convergence is that
    # we stop when M^(j+1)P-M^jP has length less than tolerance,
    # in l1 norm.

    mr_job = MrJobDangler(args=[infile])
       
    iteration = 1

    while iteration < 20:
        print "Iteration: "+str(iteration)
        # Run the MapReduce job used to compute the inner product
        # between the vector of dangling pages and the estimated
        # PageRank.
        # ip_list = map_reduce.map_reduce(i,ip_mapper,ip_reducer)
        
        with open(outfile, 'w') as rankfile:
            with mr_job.make_runner() as runner:
                runner.run()
                for line in runner.stream_output():
                    rankfile.write(line)
                counters = runner.counters()
                if 'Page Rank' in counters[0]:
                    ip = counters[0]['Page Rank']['Dangling Mass']            


        mr_job_rank = MrJobRanker(args=[outfile,
                                        '--n', str(n),
                                        '--s', str(s),
                                        '--ip', str(ip)
                                       ])
        
        with open(infile, 'w') as outfinal:
            with mr_job_rank.make_runner() as runner:
                runner.run()
                for line in runner.stream_output():
                    outfinal.write(line)

        iteration += 1

n = 1000 # works up to about 1000000 pages
input_file = 'pagerank-in.txt'
output_file = 'pagerank-out.txt'

initweb.initialize_file(n, 2.0, input_file)
#i = initialize(n,2.0)
pagerank(input_file, output_file,n,0.85,0.0001)

Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9
Iteration: 10
Iteration: 11
Iteration: 12
Iteration: 13
Iteration: 14
Iteration: 15
Iteration: 16
Iteration: 17
Iteration: 18
Iteration: 19


## Part 3: Toy Graph from HW9

Now the algorithm seems to work correctly with MRJob but I have to say it's a bit of a hack on this first iteration. Since the code that Michael wrote has access to the intermediate map_reduce values I chose to replicate that by outputting the dangling mass value as a counter value. The counter value is then recovered by the driver and used as a parameter to the second stage MR. The problem with all that is that there is now an intermediate write to file between the first and second stage MR, and the data has to stream through the driver - not exactly scalable. But I can work on the scalability part in another iteration. For now I want to compare the toy graph calculation to my PageRank MR calculation from HW9.

In [106]:
%%writefile MrJobTransform.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import numpy as np

import re
#
# Visit all nodes of the graph to calculate the minimum distance of the graph 
#         
class MrJobTransform(MRJob):

    def configure_options(self):
        super(MrJobTransform, self).configure_options()    
        self.add_passthrough_option(
            '--nodes', dest='nodes', type='int', default=0, 
            help="Number of nodes")
        self.add_passthrough_option(
            '--maptasks', dest='mappers', type='int', default=2, 
            help="mapper task instances")
        self.add_passthrough_option(
            '--reducetasks', dest='reducers', type='int', default=1, 
            help="reducer task instances")
        

    # each line is from the adjacency list node_id \t {neighbor: weight}
    # where {neighbor:weight} is one of m elements in the dictionary
    # emit node \t [page_rank, num_outlinks, [outlinks]]
    def mapper(self, _, line):
        node, adj_list = re.split('\t',line.strip())
        node = node.strip('"')
        neighbors = eval(adj_list)
        yield node, [neighbors.keys()]
        for neighbor in neighbors:
            yield neighbor, []

    def reducer_init(self):
        self.rank = 1.0/self.options.nodes
        
    # node, [page_rank, [outlinks]]
    def reducer(self, node, outlinks_list):
        combined_outlinks = []
        for outlinks in outlinks_list:
            for outlink in outlinks:
                combined_outlinks = combined_outlinks + outlink
        self.increment_counter('transformer', 'nodes', 1) 
        yield node, [self.rank, len(combined_outlinks), combined_outlinks]

    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer_init=self.reducer_init,
                       reducer=self.reducer,
                       jobconf = {
                        'mapreduce.job.maps' : self.options.mappers,
                        'mapreduce.job.reduces' : self.options.reducers}
                    )]
    
if __name__ == '__main__':
    MrJobTransform.run()

Overwriting MrJobTransform.py


In [136]:
from MrJobTransform import MrJobTransform
              
def transform(qfile):
    mr_job = MrJobTransform(args=[qfile,
                                     '-r','local', 
                                     '--nodes', '1000', 
                                     '--maptasks', '2',
                                     '--reducetasks', '1',
                                    ])

    with open('pagerank-in.txt','w') as rankfile:
        with mr_job.make_runner() as runner:
            runner.run()
            for line in runner.stream_output():
                rankfile.write(line)
            counters = runner.counters()
            print 'Node Count: {0}'.format(counters[0]['transformer']['nodes'])

        
if __name__ == '__main__':
    transform('HW9/Data/PageRank-test.txt')

Node Count: 11


In [140]:
from MrJobDangler import MrJobDangler
from MrJobRanker import MrJobRanker
from initweb import initweb

def pagerank(infile,outfile, n=1000,s=0.85,tolerance=0.00001):

    mr_job = MrJobDangler(args=[infile])
       
    iteration = 1

    while iteration < 50:
        print "Iteration: "+str(iteration)
        # Run the MapReduce job used to compute the inner product
        # between the vector of dangling pages and the estimated
        # PageRank.
        # ip_list = map_reduce.map_reduce(i,ip_mapper,ip_reducer)
        
        with open(outfile, 'w') as rankfile:
            with mr_job.make_runner() as runner:
                runner.run()
                for line in runner.stream_output():
                    rankfile.write(line)
                counters = runner.counters()
                if 'Page Rank' in counters[0]:
                    ip = counters[0]['Page Rank']['Dangling Mass']            


        mr_job_rank = MrJobRanker(args=[outfile,
                                        '--n', str(n),
                                        '--s', str(s),
                                        '--ip', str(ip)
                                       ])
        
        with open(infile, 'w') as outfinal:
            with mr_job_rank.make_runner() as runner:
                runner.run()
                for line in runner.stream_output():
                    outfinal.write(line)

        iteration += 1

n = 11 # works up to about 1000000 pages
input_file = 'pagerank-in.txt'
output_file = 'pagerank-out.txt'

#initweb.initialize_file(n, 2.0, input_file)
#i = initialize(n,2.0)
pagerank(input_file, output_file,n,0.85,0.0001)

Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9
Iteration: 10
Iteration: 11
Iteration: 12
Iteration: 13
Iteration: 14
Iteration: 15
Iteration: 16
Iteration: 17
Iteration: 18
Iteration: 19
Iteration: 20
Iteration: 21
Iteration: 22
Iteration: 23
Iteration: 24
Iteration: 25
Iteration: 26
Iteration: 27
Iteration: 28
Iteration: 29
Iteration: 30
Iteration: 31
Iteration: 32
Iteration: 33
Iteration: 34
Iteration: 35
Iteration: 36
Iteration: 37
Iteration: 38
Iteration: 39
Iteration: 40
Iteration: 41
Iteration: 42
Iteration: 43
Iteration: 44
Iteration: 45
Iteration: 46
Iteration: 47
Iteration: 48
Iteration: 49


## Conclusion

The output of this algorithm comes to the correct convergence for the toy graph as presented for HW9. I think one difference is that I realized that the dangling mass is not distributed correctly when you have more than one mapper and reducer; it only gets distributed on the keys for a single reducer. There may be more to it than that, however, since when I look at the structure that I put together for the 2 stage MR jobs it _should_ work correctly, but it's slightly off.

Comparing the actual calculation of in the second stage reducer, the calculations are the same, as is where the dangling mass is calculated. Therefore it must be that it's the distribution of the mass across the mapper and reducer instances.