### Question 9.1 

*Write a basic MRJob implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input (as explored in HW 7).
Make sure that you implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration
so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page].*

#### Solution


In [6]:
%%writefile test_pr.py
from __future__ import division
 
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONProtocol
 
from collections import defaultdict
from operator import itemgetter
import os 


# Manual number of nodes 
numberOfNodes = pow(10, 1)


class pageRank(MRJob):
 
     """ This class implements the page-rank calculation. """


     def configure_options(self):

          """ Load options for the class. """

          super(pageRank, self).configure_options()

          self.add_passthrough_option('--alpha',
               default=0.85, help='alpha: Dampening factor for teleportation in PageRank')

          self.add_passthrough_option('--iterations',
               default=10, help='iterations: number of iterations for PageRank')


     def load_options(self, args):

          """ Initializes the arguments for each class. """

          super(pageRank, self).load_options(args)

          self.alpha = self.options.alpha
          self.iterations = self.options.iterations


     def mapper_init_pr(self, _, line):

          """ This initializes the PageRank algorithm by assembling the node list 
          for the initial PageRank values. """

          # Initiate 
          global numberOfNodes 

          # Parse 
          line = line.split('\t')
          node = line[0]
          adjacencyList = eval(line[1])

          # Track 
          for neighbor in adjacencyList.keys(): 

               # Emit raw nodes
               yield neighbor, None

          # numberOfNodes += 1

          # Pass values
          yield node, adjacencyList


     def reducer_init_pr(self, node, initTuple):

          """ This attaches initial PageRanks for the algorithm. """

          adjacencyList = dict()

          # Re-discover 
          for element in initTuple:
               if isinstance(element, dict):
                    adjacencyList = element 

          # Initialize PR
          PageRank = float(1) / float(numberOfNodes)

          # Emit
          yield node, (adjacencyList, PageRank)


     def mapper_iterate_pr(self, node, nodeTuple):

          """ This projects all of the PageRank weights for each node's neighbor. """

          adjacencyList, PageRank = nodeTuple

          if not adjacencyList:
               pass

          else: 

               # Emit PR 
               for neighbor in adjacencyList.keys(): 
                    yield neighbor, PageRank / len(adjacencyList)

          # Emit structure 
          yield node, adjacencyList


     def reducer_iterate_pr(self, node, PRNodeObject):

          """ This reconstructs the graph structure form the updated PageRanks. """

          updatedPR = 0

          # Combine PR 
          for value in PRNodeObject:
               if isinstance(value, dict):
                    adjacencyList = value 

               else: 
                    updatedPR += value 

          # Damping factor 
          updatedPR = ((1 - self.alpha) / numberOfNodes) + self.alpha * updatedPR

          # Emit 
          yield node, (adjacencyList, updatedPR)


     def mapper_sort(self, node, nodeTuple):

          """ Emits the page rank for each node. """

          adjacencyList, PageRank = nodeTuple

          yield None, (node, PageRank)


     def reducer_sort(self, _, PageRankPair):

          """ Keeps the top 100 PageRank values. """

          sortedList = []

          # Iterate and remove 
          for node, score in PageRankPair:

               sortedList.append((node, score))
               sortedList = sorted(sortedList, key=itemgetter(1), reverse=True)

               if len(sortedList) > 100: 
                    sortedList.pop()


          # Emit 
          for node, score in sortedList: 
               yield node, score


     def steps(self):

          """ Determines the steps for the job. Has two phases- initiate PR and iterate. """

          initializeStep = [

               MRStep(mapper=self.mapper_init_pr, 
                         reducer=self.reducer_init_pr)

          ]

          iterateStep = [

               MRStep(mapper=self.mapper_iterate_pr, 
                         reducer=self.reducer_iterate_pr)         

          ]

          sortStep = [

               MRStep(mapper=self.mapper_sort, 
                         reducer=self.reducer_sort)

          ]

          return initializeStep + iterateStep * 10 + sortStep
 
 
if __name__ == '__main__':
               pageRank().run()                             

Overwriting test_pr.py


In [7]:
!python test_pr.py PageRank-test.txt

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/test_pr.root.20160314.231638.276024

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to /tmp/test_pr.root.20160314.231638.276024/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /tmp/test_pr.root.20160314.231638.276024/step-0-mapper-sorted
> sort /tmp/test_pr.root.20160314.231638.276024/step-0-mapper_part-00000
writing to /tmp/test_pr.root.20160314.231638.276024/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
writing to /tmp/test_pr.root.20160314.231638.276024/step-1-mapper_part-00000
Counters from step 2:
  (no counters found)
writing to /tmp/test_pr.root.20160314.231638.276024/step-1-mapper-sorted
>

### Question 9.3 

*Run your PageRank implementation on the Wikipedia dataset for 10 iterations,
and display the top 100 ranked nodes (with alpha = 0.85).*

*Run your PageRank implementation on the Wikipedia dataset for 50 iterations,
and display the top 100 ranked nodes (with teleportation factor of 0.15). 
Have the top 100 ranked pages changed? Comment on your findings. Plot the pagerank values for the top 100 pages resulting from the 50 iterations run. Then plot the pagerank values for the same 100 pages that resulted from the 10 iterations run.*

#### Solution: 


In [9]:
%%writefile wikipedia_pr.py

from __future__ import division
 
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONProtocol
 
from collections import defaultdict
from operator import itemgetter
import os 


numberOfNodes = pow(10, 7)


class pageRank(MRJob):
 
     """ This class implements the page-rank calculation. """


     def configure_options(self):

          """ Load options for the class. """

          super(pageRank, self).configure_options()

          self.add_passthrough_option('--alpha',
               default=0.85, help='alpha: Dampening factor for teleportation in PageRank')

          self.add_passthrough_option('--iterations',
               default=10, help='iterations: number of iterations for PageRank')


     def load_options(self, args):

          """ Initializes the arguments for each class. """

          super(pageRank, self).load_options(args)

          self.alpha = self.options.alpha
          self.iterations = self.options.iterations


     def mapper_init_pr(self, _, line):

          """ This initializes the PageRank algorithm by assembling the node list 
          for the initial PageRank values. """

          # Initiate 
          global numberOfNodes 

          # Parse 
          line = line.split('\t')
          node = line[0]
          adjacencyList = eval(line[1])

          # Track 
          for neighbor in adjacencyList.keys(): 

               # Emit raw nodes
               yield neighbor, None

          # numberOfNodes += 1

          # Pass values
          yield node, adjacencyList


     def reducer_init_pr(self, node, initTuple):

          """ This attaches initial PageRanks for the algorithm. """

          adjacencyList = dict()

          # Re-discover 
          for element in initTuple:
               if isinstance(element, dict):
                    adjacencyList = element 

          # Initialize PR
          PageRank = float(1) / float(numberOfNodes)

          # Emit
          yield node, (adjacencyList, PageRank)


     def mapper_iterate_pr(self, node, nodeTuple):

          """ This projects all of the PageRank weights for each node's neighbor. """

          adjacencyList, PageRank = nodeTuple

          if not adjacencyList:
               pass

          else: 

               # Emit PR 
               for neighbor in adjacencyList.keys(): 
                    yield neighbor, PageRank / len(adjacencyList)

          # Emit structure 
          yield node, adjacencyList


     def reducer_iterate_pr(self, node, PRNodeObject):

          """ This reconstructs the graph structure form the updated PageRanks. """

          updatedPR = 0

          # Combine PR 
          for value in PRNodeObject:
               if isinstance(value, dict):
                    adjacencyList = value 

               else: 
                    updatedPR += value 

          # Damping factor 
          updatedPR = ((1 - self.alpha) / numberOfNodes) + self.alpha * updatedPR

          # Emit 
          yield node, (adjacencyList, updatedPR)


     def mapper_sort(self, node, nodeTuple):

          """ Emits the page rank for each node. """

          adjacencyList, PageRank = nodeTuple

          yield None, (node, PageRank)


     def reducer_sort(self, _, PageRankPair):

          """ Keeps the top 100 PageRank values. """

          sortedList = []

          # Iterate and remove 
          for node, score in PageRankPair:

               sortedList.append((node, score))
               sortedList = sorted(sortedList, key=itemgetter(1), reverse=True)

               if len(sortedList) > 100: 
                    sortedList.pop()


          # Emit 
          for node, score in sortedList: 
               yield node, score


     def steps(self):

          """ Determines the steps for the job. Has two phases- initiate PR and iterate. """

          initializeStep = [

               MRStep(mapper=self.mapper_init_pr, 
                         reducer=self.reducer_init_pr)

          ]

          iterateStep = [

               MRStep(mapper=self.mapper_iterate_pr, 
                         reducer=self.reducer_iterate_pr)         

          ]

          sortStep = [

               MRStep(mapper=self.mapper_sort, 
                         reducer=self.reducer_sort)

          ]

          return initializeStep + iterateStep * 5 + sortStep
 
 
if __name__ == '__main__':
               pageRank().run()                             

Overwriting wikipedia_pr.py


In [12]:
!python wikipedia_pr.py -r emr s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt

Got unexpected keyword arguments: ssh_tunnel
using configs in /home/filip/.mrjob.conf
using existing scratch bucket mrjob-01bc9d239cbfa41e
using s3://mrjob-01bc9d239cbfa41e/tmp/ as our scratch dir on S3
creating tmp directory /tmp/wikipedia_pr.root.20160314.233701.849996
writing master bootstrap script to /tmp/wikipedia_pr.root.20160314.233701.849996/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://mrjob-01bc9d239cbfa41e/tmp/wikipedia_pr.root.20160314.233701.849996/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-36F6FLT64CQKD
Created new job flow j-36F6FLT64CQKD
Job launched 30.6s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 61.3s ago, status STARTING: Provisioning Ama

#### Solution (10 Iterations): 

In [1]:
%%writefile wikipedia_pr.py

from __future__ import division
 
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONProtocol
 
from collections import defaultdict
from operator import itemgetter
import os 


numberOfNodes = pow(10, 7)


class pageRank(MRJob):
 
     """ This class implements the page-rank calculation. """


     def configure_options(self):

          """ Load options for the class. """

          super(pageRank, self).configure_options()

          self.add_passthrough_option('--alpha',
               default=0.85, help='alpha: Dampening factor for teleportation in PageRank')

          self.add_passthrough_option('--iterations',
               default=10, help='iterations: number of iterations for PageRank')


     def load_options(self, args):

          """ Initializes the arguments for each class. """

          super(pageRank, self).load_options(args)

          self.alpha = self.options.alpha
          self.iterations = self.options.iterations


     def mapper_init_pr(self, _, line):

          """ This initializes the PageRank algorithm by assembling the node list 
          for the initial PageRank values. """

          # Initiate 
          global numberOfNodes 

          # Parse 
          line = line.split('\t')
          node = line[0]
          adjacencyList = eval(line[1])

          # Track 
          for neighbor in adjacencyList.keys(): 

               # Emit raw nodes
               yield neighbor, None

          # numberOfNodes += 1

          # Pass values
          yield node, adjacencyList


     def reducer_init_pr(self, node, initTuple):

          """ This attaches initial PageRanks for the algorithm. """

          adjacencyList = dict()

          # Re-discover 
          for element in initTuple:
               if isinstance(element, dict):
                    adjacencyList = element 

          # Initialize PR
          PageRank = float(1) / float(numberOfNodes)

          # Emit
          yield node, (adjacencyList, PageRank)


     def mapper_iterate_pr(self, node, nodeTuple):

          """ This projects all of the PageRank weights for each node's neighbor. """

          adjacencyList, PageRank = nodeTuple

          if not adjacencyList:
               pass

          else: 

               # Emit PR 
               for neighbor in adjacencyList.keys(): 
                    yield neighbor, PageRank / len(adjacencyList)

          # Emit structure 
          yield node, adjacencyList


     def reducer_iterate_pr(self, node, PRNodeObject):

          """ This reconstructs the graph structure form the updated PageRanks. """

          updatedPR = 0

          # Combine PR 
          for value in PRNodeObject:
               if isinstance(value, dict):
                    adjacencyList = value 

               else: 
                    updatedPR += value 

          # Damping factor 
          updatedPR = ((1 - self.alpha) / numberOfNodes) + self.alpha * updatedPR

          # Emit 
          yield node, (adjacencyList, updatedPR)


     def mapper_sort(self, node, nodeTuple):

          """ Emits the page rank for each node. """

          adjacencyList, PageRank = nodeTuple

          yield None, (node, PageRank)


     def reducer_sort(self, _, PageRankPair):

          """ Keeps the top 100 PageRank values. """

          sortedList = []

          # Iterate and remove 
          for node, score in PageRankPair:

               sortedList.append((node, score))
               sortedList = sorted(sortedList, key=itemgetter(1), reverse=True)

               if len(sortedList) > 100: 
                    sortedList.pop()


          # Emit 
          for node, score in sortedList: 
               yield node, score


     def steps(self):

          """ Determines the steps for the job. Has two phases- initiate PR and iterate. """

          initializeStep = [

               MRStep(mapper=self.mapper_init_pr, 
                         reducer=self.reducer_init_pr)

          ]

          iterateStep = [

               MRStep(mapper=self.mapper_iterate_pr, 
                         reducer=self.reducer_iterate_pr)         

          ]

          sortStep = [

               MRStep(mapper=self.mapper_sort, 
                         reducer=self.reducer_sort)

          ]

          return initializeStep + iterateStep * 10 + sortStep
 
 
if __name__ == '__main__':
               pageRank().run()                             

Overwriting wikipedia_pr.py


In [2]:
!python wikipedia_pr.py -r emr s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt

Got unexpected keyword arguments: ssh_tunnel
using configs in /home/filip/.mrjob.conf
using existing scratch bucket mrjob-01bc9d239cbfa41e
using s3://mrjob-01bc9d239cbfa41e/tmp/ as our scratch dir on S3
creating tmp directory /tmp/wikipedia_pr.root.20160315.130627.488562
writing master bootstrap script to /tmp/wikipedia_pr.root.20160315.130627.488562/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://mrjob-01bc9d239cbfa41e/tmp/wikipedia_pr.root.20160315.130627.488562/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-26MFDXQJ95PZM
Created new job flow j-26MFDXQJ95PZM
Job launched 30.6s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 61.2s ago, status STARTING: Provisioning Ama