In [1]:
# map_reduce.py
# Defines a single function, map_reduce, which takes an input
# dictionary i and applies the user-defined function mapper to each
# (input_key,input_value) pair, producing a list of intermediate 
# keys and intermediate values.  Repeated intermediate keys then 
# have their values grouped into a list, and the user-defined 
# function reducer is applied to the intermediate key and list of 
# intermediate values.  The results are returned as a list.

import itertools

def map_reduce(i,mapper,reducer):
    intermediate = []
    for (key,value) in i.items():
        intermediate.extend(mapper(key,value))
    groups = {}
    for key, group in itertools.groupby(sorted(intermediate), lambda x: x[0]): 
        groups[key] = list([y for x, y in group])
    return [reducer(intermediate_key,groups[intermediate_key]) for intermediate_key in groups] 

In [2]:
# pagerank_mr.py
#
# Computes PageRank, using a simple MapReduce library.
#
# MapReduce is used in two separate ways: (1) to compute
# the inner product between the vector of dangling pages
# (i.e., pages with no outbound links) and the current
# estimated PageRank vector; and (2) to actually carry
# out the update of the estimated PageRank vector.
#
# For a web of one million webpages the program consumes
# about one gig of RAM, and takes an hour or so to run,
# on a (slow) laptop with 3 gig of RAM, running Vista and
# Python 2.5.

from . import map_reduce
import numpy.random
import random

def paretosample(n,power=2.0):
    # Returns a sample from a truncated Pareto distribution
    # with probability mass function p(l) proportional to
    # 1/l^power.  The distribution is truncated at l = n.

    m = n+1
    while m > n: m = numpy.random.zipf(power)
    return m

def initialize(n,power):
  # Returns a Python dictionary representing a web
  # with n pages, and where each page k is linked to by
  # L_k random other pages.  The L_k are independent and
  # identically distributed random variables with a
  # shifted and truncated Pareto probability mass function
  # p(l) proportional to 1/(l+1)^power.

  # The representation used is a Python dictionary with
  # keys 0 through n-1 representing the different pages.
  # i[j][0] is the estimated PageRank, initially set at 1/n,
  # i[j][1] the number of outlinks, and i[j][2] a list of
  # the outlinks.

  # This dictionary is used to supply (key,value) pairs to
  # both mapper tasks defined below.

  # initialize the dictionary
    i = {} 
    for j in range(n): i[j] = [1.0/n,0,[]]

    # For each page, generate inlinks according to the Pareto
    # distribution. Note that this is somewhat tedious, because
    # the Pareto distribution governs inlinks, NOT outlinks,
    # which is what our representation is adapted to represent.
    # A smarter representation would give easy
    # access to both, while remaining memory efficient.
    for k in range(n):
        lk = paretosample(n+1,power)-1
        values = random.sample(range(n),lk)
        for j in values:
            i[j][1] += 1 # increment the outlink count for page j
            i[j][2].append(k) # insert the link from j to k
    return i

def ip_mapper(input_key,input_value):
  # The mapper used to compute the inner product between
  # the vector of dangling pages and the current estimated
  # PageRank.  The input is a key describing a webpage, and
  # the corresponding data, including the estimated pagerank.
  # The mapper returns [(1,pagerank)] if the page is dangling,
  # and otherwise returns nothing.
  
    if input_value[1] == 0: return [(1,input_value[0])]
    else: return []

def ip_reducer(input_key,input_value_list):
  # The reducer used to compute the inner product.  Simply
  # sums the pageranks listed in the input value list, which
  # are all the pageranks for dangling pages.

    return sum(input_value_list)

def pr_mapper(input_key,input_value):
  # The mapper used to update the PageRank estimate.  Takes
  # as input a key for a webpage, and as a value the corresponding
  # data, as described in the function initialize.  It returns a
  # list with all outlinked pages as keys, and corresponding values
  # just the PageRank of the origin page, divided by the total
  # number of outlinks from the origin page.  Also appended to
  # that list is a pair with key the origin page, and value 0.
  # This is done to ensure that every single page ends up with at
  # least one corresponding (intermediate_key,intermediate_value)
  # pair output from a mapper.
  
    return [(input_key,0.0)]+[(outlink,input_value[0]/input_value[1]) for outlink in input_value[2]]

def pr_reducer_inter(intermediate_key,intermediate_value_list,
                     s,ip,n):
  # This is a helper function used to define the reducer used
  # to update the PageRank estimate.  Note that the helper differs
  # from a standard reducer in having some additional inputs:
  # s (the PageRank parameter), ip (the value of the inner product
  # between the dangling pages vector and the estimated PageRank),
  # and n, the number of pages.  Other than that the code is
  # self-explanatory.
  
    return (intermediate_key, s*sum(intermediate_value_list)+s*ip/n+(1.0-s)/n)

def pagerank(i,s=0.85,tolerance=0.00001):
  # Returns the PageRank vector for the web described by i,
  # using parameter s.  The criterion for convergence is that
  # we stop when M^(j+1)P-M^jP has length less than tolerance,
  # in l1 norm.
  
    n = len(i)
    iteration = 1
    change = 2 # initial estimate of error
    while change > tolerance:
        print("Iteration: " + str(iteration))
        # Run the MapReduce job used to compute the inner product
        # between the vector of dangling pages and the estimated
        # PageRank.
        ip_list = map_reduce(i,ip_mapper,ip_reducer)

        # the if-else clause is needed in case there are no dangling
        # pages, in which case MapReduce returns ip_list as the empty
        # list.  Otherwise, set ip equal to the first (and only)
        # member of the list returned by MapReduce.
        if ip_list == []: ip = 0
        else: ip = ip_list[0]

        # Dynamically define the reducer used to update the PageRank
        # vector, using the current values for s, ip, and n.
        pr_reducer = lambda x,y: pr_reducer_inter(x,y,s,ip,n)

        # Run the MapReduce job used to update the PageRank vector.
        new_i = map_reduce(i,pr_mapper,pr_reducer)
        
        new_dict = {}
        for x in new_i:
            new_dict[x[0]] = x[1]
       
        # Compute the new estimate of error.
        change = sum([abs(new_dict[j]-i[j][0]) for j in i])
        print("Change in l1 norm: " + str(change))
        

        # Update the estimate PageRank vector.
        for j in i: 
            i[j][0] = new_dict[j]
        iteration += 1
    return i



In [3]:
with open('PageRank-test.txt') as input:
    lines = input.readlines()

node_list = {}
for line in lines:
    _id, adj_list = line.strip().split('\t', 1)
    cmd = 'adj_list = %s' %adj_list
    exec(cmd)
    
    
    node = [1/len(lines), len(adj_list), list(adj_list.keys())]
    node_list[_id] = node

# print(node_list)

In [None]:
pagerank(node_list,0.85,0.0001)

In [None]:
# pagerank(i,0.85,0.0001)