Academic declaration

I have read and understood the sections on plagiarism in the College Policy on assessment offences and confirm that the work is my own, with the work of others clearly acknowledged. I give my permission to submit my work to the plagiarism testin database that the College is using and test it using plagiarism detection software, search engines or meta-searching software.


### Cloud computing submission

Author: Dorian Beganovic

Contents of the Jupyter notebook:

1) Conditional probabilities using Stripes pattern  
2) Conditional probabilities using Pairs pattern  
3) Page rank using the "simplified" page rank algorithm  
4) Page rank using the "complete" page rank algorithm  


- Explanation of other submitted files
	- *.jpg pictures* in *final_screenshots* folder - the screenshots for each job consisting of
		- Screenshot from AWS EMR
			- Execution of first step in the EMR console
			- Execution of last step in the EMR console
		- Screenshot from my command line
			- Execution of first step from the command line
			- Execution of last step from the command line
		- Screenshot of S3 output
		- Log (syslog) of the last step in EMR
	- *report.txt* - the document explaining how much time each program took to run on EMR and how much time I spent working on this problem
	- *mrjob.conf* - the mrjob configuration file used
	- *execute.sh* - commands used to execute each of the tasks
	- *count_nodes.py* - script to get the minimum node id, maximum node id and the number of unique nodes
	- *test_bigrams.py* - script that reads the "shortjokes.csv" file and calculates bigram probabilities. It output matches both the stripes and pairs output for the bigrams starting with "my"
	- *test_pagerank_scores.py* - script that operates on the output of page rank jobs by summing up the page rank values. This is to ensure that the complete page rank keeps the sum of page rank at 1 (since all nodes are initialised with 1/num_nodes)
	- *test_pageranks.py* - script that utilises the networkx python package to calculate the page ranks. It operates using a more complex method so the page ranks between this method and my complete page rank don't exactly match but the top 20 nodes by page rank are identical
	

## 1) Conditional probabilities using Stripes pattern

In [1]:
from mrjob.job import MRJob, MRStep
import re
from collections import defaultdict, Counter
import operator


class MRWordBigramProb(MRJob):
    def mapper_init(self):
        """
        Initialise the dictionary for "in-mapper" combining
        where key will be a word and values are a list

        :return:
        """
        self.following_words = defaultdict(lambda: [])

    def mapper(self, joke_id, content):
        """
        For each encountered word in the text, add a entry to the list of following words.

        We ignore the last word in a sentence as it has not following words.

        :param joke_id: ignored
        :param content:
        :return:
        """
        words = re.sub('\s+', ' ', re.sub('[^a-z]+', ' ', re.sub('\'', '', content.lower()))).strip(' ').split(" ")
        for index, word in enumerate(words):
            if index < len(words) - 1:
                self.following_words[word].append(words[index+1])

    def mapper_final(self):
        """
        Create a "stripe" out of the list of the succeeding words for each word.

        For each word, loops over all the following words (stored in a list) and places them into
        a dictionary which counts the numbers of their occurrences.

        :return:
        """
        for word in self.following_words.keys():
            bigram_count = defaultdict(lambda: 0)
            for following_word in self.following_words[word]:
                bigram_count[following_word] += 1
            yield word, bigram_count

    def reducer_init(self):
        """
        Keeps state over multiple reduce iterations, required for sorting the final output

        :return:
        """
        self.bigram_frequencies = []

    def reducer(self, word, stripes):
        """
        Adds up the individual stripes from each reducer
        and then calculates the conditional probability

        :param word:
        :param stripes:
        :return:
        """
        counts = Counter()
        for stripe in stripes:
            counts.update(stripe)
        count_total = sum(counts.values())
        for following_word in counts:
            self.bigram_frequencies.append((word, following_word, (counts[following_word] / count_total)))

    def reducer_final(self):
        """
        Sorts the triples of (word, following word, frequency of the bigram)
        based word and then frequency of the bigram (word, following word).
        Outputs sorted values.

        :return:
        """
        sorted_bigram_frequencies = sorted(self.bigram_frequencies, key=operator.itemgetter(0, 2))
        for bigram in sorted_bigram_frequencies:
            yield bigram[0] + "-" + bigram[1], bigram[2]

    def steps(self):
        steps = [
            MRStep(mapper_init=self.mapper_init,
                   mapper=self.mapper,
                   mapper_final=self.mapper_final,
                   reducer_init=self.reducer_init,
                   reducer=self.reducer,
                   reducer_final=self.reducer_final
            )
        ]
        return steps


if __name__ == '__main__':
    MRWordBigramProb.run()

Top 10 most likely words after "my" using the Stripes pattern:

| Rank | Word | Probability   |  
|------|------|------|  
|1|"my-wife" |	0.05205927439959121  
|2|"my-girlfriend" |	0.03613694430250383  
|3|"my-friend" |	0.026448645886561064  
|4|"my-dad" |	0.01598364844149208  
|5|"my-women" |	0.01483903934593766  
|6|"my-life" |	0.012692897291773123  
|7|"my-mom" |	0.011016862544711292  
|8|"my-favorite" |	0.010975983648441491  
|9|"my-coffee" |	0.01093510475217169  
|10|"my-son" |	0.010689831374552887  






## 2) Conditional probabilities using Pairs pattern

In [2]:
from mrjob.job import MRJob, MRStep
import re
from collections import defaultdict
import operator


class MRWordBigramProb(MRJob):
    #PARTITIONER = 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
    SORT_VALUES = True

    def mapper_init(self):
        """
        Initialises the dictionary used for "in-mapper" combining

        :return:
        """
        self.pair_counts = defaultdict(lambda: 0)

    def mapper(self, joke_id, content):
        """
        For each encountered word in joke, it increments the count for the bigram
        with it's following word.
        it also adds the value to "*"

        Note that we ignore the last word in a sentence as it has no following words.

        :param joke_id: ignored
        :param content: the raw content of the document
        :return: tuple of ((word, following_word),1) or ((word,*))
        """
        words = re.sub('\s+', ' ', re.sub('[^a-z]+', ' ', re.sub('\'', '', content.lower()))).strip(' ').split(" ")
        for index, word in enumerate(words):
            if index < len(words) - 1:
                self.pair_counts[(word, "*")] += 1  # will be used to calculated the total number of occurrences of the word
                self.pair_counts[(word, words[index+1])] += 1

    def mapper_final(self):
        """
        Yields the pairs after they have combined.

        :return:
        """
        for pair in self.pair_counts.keys():
            yield pair, self.pair_counts[pair]

    def reducer_init(self):
        """
        Keeps state over multiple reduce iterations, very important in order to
        calculate the "total_word_count" for each 1st word of the bigram

        :return:
        """
        self.total_word_count = {}
        self.bigram_frequencies = []

    def reducer(self, pair, counts):
        """
        Expects that the input key (word, following_word) is partitioned by 1st part of the key (word)
        and sorted by the 2nd key (following word).
        It expects that "*" will be the first following word for every word.

        This is exactly specified in the config.

        Outputs a triple containing (word, following word, conditional probability).

        :param pair: a bigram (word, following_word)
        :param counts: sum of counts for every bigram on every node
        :return:
        """
        word = pair[0]
        following_word = pair[1]
        if following_word == "*":
            self.total_word_count[word] = sum(counts)
        else:
            self.bigram_frequencies.append((word, following_word, (sum(counts) / self.total_word_count[word])))

    def reducer_final(self):
        """
        Sorts the triples of (word, following word, frequency of the bigram)
        based word and then frequency of the bigram (word, following word).
        Outputs sorted values.

        :return:
        """
        sorted_bigram_frequencies = sorted(self.bigram_frequencies, key=operator.itemgetter(0, 2))
        for bigram in sorted_bigram_frequencies:
            yield bigram[0] + "-" + bigram[1], bigram[2]

    def steps(self):
        # While these configs make sense, they are not necessary when SORT_VALEUS=True is used
        #   https://mrjob.readthedocs.io/en/latest/job.html#mrjob.job.MRJob.SORT_VALUES
        # jobconf = {
        #     'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
        #     'mapred.text.key.comparator.options': '-k2r',
        #     'mapred.text.key.partitioner.options': '-k1',
        #     'stream.num.map.output.key.fields': 2,
        #     'stream.map.output.field.separator' : "\t",
        #     'mapreduce.map.output.key.field.separator': '\t',
        # }
        return [
            MRStep(mapper_init=self.mapper_init,
                   mapper=self.mapper,
                   mapper_final=self.mapper_final,
                   reducer_init=self.reducer_init,
                   reducer=self.reducer,
                   reducer_final=self.reducer_final,
                   # jobconf=jobconf
            )
        ]


if __name__ == '__main__':
    MRWordBigramProb().run()

Top 10 most likely words after "my" using the Pairs pattern:

| Rank | Word | Probability   |  
|------|------|------|  
|1|"my-wife" |	0.05205927439959121  
|2|"my-girlfriend" |	0.03613694430250383  
|3|"my-friend" |	0.026448645886561064  
|4|"my-dad" |	0.01598364844149208  
|5|"my-women" |	0.01483903934593766  
|6|"my-life" |	0.012692897291773123  
|7|"my-mom" |	0.011016862544711292  
|8|"my-favorite" |	0.010975983648441491  
|9|"my-coffee" |	0.01093510475217169  
|10|"my-son" |	0.010689831374552887  



## 3) Page rank using the "simplified" page rank algorithm

In [4]:
from mrjob.job import MRJob, MRStep
from mrjob.protocol import TextProtocol
from collections import defaultdict
import heapq


class MRPageRank(MRJob):
    INPUT_PROTOCOL = TextProtocol

    def configure_args(self):
        """
        We configure:
        - n_iterations - number of iterations of page rank
        - n_nodes - number of nodes in the graph
        - top_n - how many of the top results we want

        :return:
        """
        super(MRPageRank, self).configure_args()
        self.add_passthru_arg(
            '--n_iterations', type=int, default=10, help='Number of iterations of page rank to run')
        self.add_passthru_arg(
            '--n_nodes', type=int, default=75879, help='Number of nodes in the graph')
        self.add_passthru_arg(
            '--top_n', type=int, default=20, help='Number of top pages to output')

    def convert_edge_list_to_adjacency_list_reducer(self, node_id, nodes):
        """
        Converts the edge list (node_a, node_b) into a single node
        with adjacency list ({out_links = [a,b,c], page_rank = pr})

        Furthermore, we output all outgoing nodes with a placeholder "*"
        to initialise the dangling nodes in the following reducer

        :param node_id: the node id which is the key in reduce step
        :param nodes: list of node id's
        :return:
        """
        node = dict()
        node['out_links'] = list(nodes)
        node['page_rank'] = 1 / self.options.n_nodes  # default values
        yield node_id, node

        for out_node_id in node['out_links']:
            yield out_node_id, "*"

    def convert_edge_list_to_adjacency_list_reducer_dangling(self, node_id, nodes):
        """
        The previous reducer outputs a node id and either "*" or the nodes structure as value.

        This reducer doesn't change the nodes that have a node structure as value.
        For nodes that only have "*" as value, it constructs a dangling node.

        :param node_id: the node id which is the key in reduce step
        :param nodes: list "*" or node structure
        :return:
        """
        real_node = None
        for node in nodes:
            if node == "*":
                pass
            elif isinstance(node, dict):
                real_node = node
            else:
                raise ValueError("Shouldn't happen")

        if real_node is None:
            real_node = dict()
            real_node['out_links'] = []
            real_node['page_rank'] = 1 / self.options.n_nodes

        yield node_id, real_node

    def map_page_rank_contribution_init(self):
        """
        To utilise the "in-mapper" combiner pattern, initialises a dictionary
        for holding nodes and their associated sums of incoming page ranks:
            - {node_id: [sum of incoming page rank contributions] , ...}

        :return:
        """
        self.incoming_page_ranks = defaultdict(lambda: 0)

    def map_page_rank_contribution(self, node_id, node):
        """
        For every node id and it's associated node structure:
        1. divides the page rank of the node by the total number of outgoing links
        2. stores the page rank contribution to every outgoing link (node)
            which later gets emited using the "in-mapper" combiner patter
        3. emits the inputs node_id and node_structure

        :param node_id: node id
        :param node: node structure in JSON format
        :return:
        """
        if len(node['out_links']) > 0:
            page_rank_contribution = node['page_rank'] / len(node['out_links'])
            for out_link_node_id in node['out_links']:
                self.incoming_page_ranks[out_link_node_id] += page_rank_contribution
        yield node_id, node

    def map_page_rank_contribution_final(self):
        """
        For every stored node_id, returns combined incoming page rank contributions

        :return:
        """
        for node_id in self.incoming_page_ranks.keys():
            yield node_id, self.incoming_page_ranks[node_id]

    def reduce_incoming_page_rank_contributions(self, node_id, page_rank_contributions):
        """
        Sums up all incoming page rank contributions for a node without taking into account the
        loss of page rank mass due to dangling nodes and "teleportation".

        Yields a node with the updated page rank value.

        :param node_id: node
        :param page_rank_contributions: list of contribution values and the node structure
        :return:
        """

        node = None
        page_rank_sum = 0
        for page_rank_contribution in page_rank_contributions:
            if isinstance(page_rank_contribution, float):
                page_rank_sum += page_rank_contribution
            else:
                # this is the case where we send the actual node JSON structure
                node = page_rank_contribution

        node['page_rank'] = page_rank_sum
        yield node_id, node

    def topN_mapper_init(self):
        """
        Stores all the page ranks received in the mapper

        :return:
        """
        self.values = []

    def topN_mapper(self, node_id, node):
        """
        Reverts the order so that page ranks are keys.
        This is used together with a custom comparator in the shuffle stage before the next reducer

        :param node_id:
        :param node:
        :return:
        """
        self.values.append((node['page_rank'], node_id))

    def topN_mapper_final(self):
        """
        Returns N largest values based on page rank as (page rank, node id)

        :return:
        """
        for pair in heapq.nlargest(n=self.options.top_n, iterable=self.values, key=lambda x: x[0]):
            yield pair

    def topN_reducer_init(self):
        """
        Keeps track of how many top page ranks were iterated over

        :return:
        """
        self.values = []

    def topN_reducer(self, page_rank, node_ids):
        """
        Outputs the top N nodes along with their page ranks

        :param page_rank:
        :param node_ids:
        :return:
        """
        for node_id in node_ids:
            self.values.append((page_rank, node_id))

    def topN_reducer_final(self):
        """
        Returns N largest values based on page rank as (page rank, node id)

        :return:
        """
        for pair in heapq.nlargest(n=self.options.top_n, iterable=self.values, key=lambda x: x[0]):
            yield pair

    def steps(self):
        steps = [MRStep(reducer=self.convert_edge_list_to_adjacency_list_reducer)] + \
                [MRStep(reducer=self.convert_edge_list_to_adjacency_list_reducer_dangling)] + \
                [MRStep(
                    mapper_init=self.map_page_rank_contribution_init,
                    mapper=self.map_page_rank_contribution,
                    mapper_final=self.map_page_rank_contribution_final,
                    reducer=self.reduce_incoming_page_rank_contributions)
                ] * self.options.n_iterations + \
                [MRStep(mapper_init=self.topN_mapper_init,
                        mapper=self.topN_mapper,
                        mapper_final=self.topN_mapper_final,
                        reducer_init=self.topN_reducer_init,
                        reducer=self.topN_reducer,
                        reducer_final=self.topN_reducer_final,
                        jobconf={
                            'mapred.reduce.tasks': 1,
                            'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
                            'mapred.text.key.comparator.options': '-k1,1nr'
                        })
                 ]

        return steps


if __name__ == '__main__':
    MRPageRank.run()

Top 10 page ranks using the "simplified" page rank algorithm and 10 iterations:

| Rank | Node | Page rank   |  
|------|------|------|  
|1| "18" |  0.003130077602885494  
|2| "737" |  0.002863553011238143  
|3| "40" |  0.001871953130794608  
|4| "118" |  0.0017995720480840556  
|5| "401"  | 0.0015652778739008069  
|6| "725" | 0.001514956814924569  
|7| "1719" |  0.0015039516052128294  
|8| "136" |  0.001482907684427656  
|9| "849" |  0.0014773067290279006  
|10| "550"  | 0.0014122234888086457  


## 4) Page rank using the "complete" page rank algorithm

In [None]:
from mrjob.job import MRJob, MRStep
from mrjob.protocol import TextProtocol
from collections import defaultdict
import heapq


class MRPageRank(MRJob):
    INPUT_PROTOCOL = TextProtocol

    def configure_args(self):
        """
        We configure:
        - n_iterations - number of iterations of page rank
        - damping_factor - the damping factor from the page rank equation
        - n_nodes - number of nodes in the graph
        - min_node_id - the minimal node id
        - max_node_id - the maximal node id
        - top_n - how many of the top results we want

        :return:
        """

        super(MRPageRank, self).configure_args()
        self.add_passthru_arg(
            '--n_iterations', type=int, default=10, help='Number of iterations of page rank to run')
        self.add_passthru_arg(
            '--damping_factor', type=float, default=0.85, help='The damping factor using in the calculations')
        self.add_passthru_arg(
            '--n_nodes', type=int, default=75879, help='Number of nodes in the graph (computed using count_nodes.py)')
        self.add_passthru_arg(
            '--min_node_id', type=int, default=0, help='Minimal node id (computed using count_nodes.py)')
        self.add_passthru_arg(
            '--max_node_id', type=int, default=75887, help='Maximal node id (computed using count_nodes.py)')
        self.add_passthru_arg(
            '--top_n', type=int, default=10, help='Number of top pages to output')

    def convert_edge_list_to_adjacency_list_reducer(self, node_id, nodes):
        """
        Converts the edge list (node_a, node_b) into a single node
        with adjacency list ({out_links = [a,b,c], page_rank = pr})

        Furthermore, we output all outgoing nodes with a placeholder "*"
        to initialise the dangling nodes in the following reducer

        :param node_id: the node id which is the key in reduce step
        :param nodes: list of node id's
        :return:
        """
        node = dict()
        node['out_links'] = list(nodes)
        node['page_rank'] = 1/self.options.n_nodes
        yield node_id, node
        for out_node_id in node['out_links']:
            yield out_node_id, "*"

    def convert_edge_list_to_adjacency_list_reducer_dangling(self, node_id, nodes):
        """
        The previous reducer outputs a node id and either "*" or the nodes structure as value.

        This reducer doesn't change the nodes that have a node structure as value.
        For nodes that only have "*" as value, it constructs a dangling node.

        :param node_id: the node id which is the key in reduce step
        :param nodes: list "*" or node structure
        :return:
        """
        real_node = None
        for node in nodes:
            if node == "*":
                pass
            elif isinstance(node, dict):
                real_node = node
            else:
                raise ValueError("Shouldn't happen")

        if real_node is None:
            real_node = dict()
            real_node['out_links'] = []
            real_node['page_rank'] = 1/self.options.n_nodes

        yield node_id, real_node

    def map_page_rank_contribution_init(self):
        """
        To utilise the "in-mapper" combiner pattern, initialises a dictionary
        for holding nodes and their associated sums of incoming page ranks:
            - {node_id: [sum of incoming page rank contributions] , ...}

        :return:
        """
        self.incoming_page_ranks = defaultdict(lambda: 0)
        self.dangling_pr_mass = 0

    def map_page_rank_contribution(self, node_id, node):
        """
        For every node id and it's associated node structure:
        1.
        A) if it's not  dangling node:
            1. divides the page rank of the node by the total number of outgoing links
            2. stores the page rank contribution to every outgoing link (node)
                which later gets emited using the "in-mapper" combiner patter
        B) if it's a dangling node
            1. store it's mass together with the mass of other dangling nodes
        2. emits the inputs node_id and node_structure

        :param node_id: node id
        :param node: node structure in JSON format
        :return:
        """
        if len(node['out_links']) > 0:
            page_rank_contribution = node['page_rank'] / len(node['out_links'])
            for out_link_node_id in node['out_links']:
                self.incoming_page_ranks[out_link_node_id] += page_rank_contribution
        elif len(node['out_links']) == 0:
            self.dangling_pr_mass += node['page_rank']
        yield node_id, node

    def map_page_rank_contribution_final(self):
        """
        For every stored node_id, returns combined incoming page rank contributions.

        Returns the page rank mass of all dangling nodes in this reducer using a special "d" key
        :return:
        """
        for node_id in self.incoming_page_ranks.keys():
            yield node_id, self.incoming_page_ranks[node_id]
        yield 'd', self.dangling_pr_mass

    def reduce_incoming_page_rank_contributions(self, node_id, page_rank_contributions):
        """
        1)
            A) If the incoming node is the sum of dangling masses,
                then it loops through the range on all node id's and emits a page rank contribution to each node
            B) If the node is a normal node, it just sums up the contributions, updates the nodes pages rank
                and outputs the node structure

        :param node_id: node id
        :param page_rank_contributions: list of contributions -> either ("d",<mass>), (<node id>, <mass>), (<node id>, <node structure>)
        :return:
        """
        if node_id == 'd':
            dangling_mass = sum(page_rank_contributions)
            for node_id in range(self.options.min_node_id, self.options.max_node_id):
                yield str(node_id), dangling_mass
        else:
            node = None
            page_rank_sum = 0
            for page_rank_contribution in page_rank_contributions:
                if isinstance(page_rank_contribution, float):
                    page_rank_sum += page_rank_contribution
                else:
                    # this is the case where we send the actual node JSON structure
                    node = page_rank_contribution
            node['page_rank'] = page_rank_sum
            yield node_id, node

    def complete_page_rank_reducer(self, node_id, dangling_nodes_contributions):
        """
        1. Sums up all the incoming contributions from the dangling nodes (emitted in previous reducer)
        2. Properly updates the pagerank to take into account the damping factor and dangling mass

        :param node_id:
        :param dangling_nodes_contributions: either (<node id>, <dangling mass contribution>), (<node id>, <node structure>)
        :return:
        """
        node = None
        dangling_contribution = 0
        for i, dangling_nodes_contribution in enumerate(dangling_nodes_contributions):
            if isinstance(dangling_nodes_contribution, float):
                dangling_contribution += dangling_nodes_contribution
            else:
                # this is the case where we send the actual node JSON structure
                node = dangling_nodes_contribution

            # dangling nodes do not have the graph node structure

        if node is not None:
            old_page_rank = node['page_rank']
            updated_page_rank = ((1 - self.options.damping_factor) / self.options.n_nodes) \
                + (self.options.damping_factor * (dangling_contribution / self.options.n_nodes + old_page_rank))
            node['page_rank'] = updated_page_rank

            yield node_id, node
        else:
            # these are the 9 missing node id's in the range min_node_id to max_node_id
            pass

    def topN_mapper_init(self):
        """
        Stores all the page ranks received in the mapper

        :return:
        """
        self.values = []

    def topN_mapper(self, node_id, node):
        """
        Reverts the order so that page ranks are keys.
        This is used together with a custom comparator in the shuffle stage before the next reducer

        :param node_id:
        :param node:
        :return:
        """
        self.values.append((node['page_rank'], node_id))

    def topN_mapper_final(self):
        """
        Returns N largest values based on page rank as (page rank, node id)

        :return:
        """
        for pair in heapq.nlargest(n=self.options.top_n, iterable=self.values, key=lambda x: x[0]):
            yield pair

    def topN_reducer_init(self):
        """
        Keeps track of how many top page ranks were iterated over

        :return:
        """
        self.values = []

    def topN_reducer(self, page_rank, node_ids):
        """
        Outputs the top N nodes along with their page ranks

        :param page_rank:
        :param node_ids:
        :return:
        """
        for node_id in node_ids:
            self.values.append((page_rank, node_id))

    def topN_reducer_final(self):
        """
        Returns N largest values based on page rank as (page rank, node id)

        :return:
        """
        for pair in heapq.nlargest(n=self.options.top_n, iterable=self.values, key=lambda x: x[0]):
            yield pair

    def steps(self):
        steps = [MRStep(reducer=self.convert_edge_list_to_adjacency_list_reducer)] \
                + [ MRStep(reducer=self.convert_edge_list_to_adjacency_list_reducer_dangling)] \
                + [
                    MRStep(
                        mapper_init=self.map_page_rank_contribution_init,
                        mapper=self.map_page_rank_contribution,
                        mapper_final=self.map_page_rank_contribution_final,
                        reducer=self.reduce_incoming_page_rank_contributions
                    ),
                    MRStep(
                        reducer=self.complete_page_rank_reducer)
                ] * self.options.n_iterations \
                + [MRStep(mapper_init=self.topN_mapper_init,
                        mapper=self.topN_mapper,
                        mapper_final=self.topN_mapper_final,
                        reducer_init=self.topN_reducer_init,
                        reducer=self.topN_reducer,
                        reducer_final=self.topN_reducer_final,
                        jobconf={
                            'mapred.reduce.tasks': 1,
                            'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
                            'mapred.text.key.comparator.options': '-k1,1nr'
                        })
                 ]

        return steps


if __name__ == '__main__':
    MRPageRank.run()

Top 10 page ranks using the "complete" page rank algorithm and 50 iterations:


| Rank | Node | Page rank   |  
|------|------|------|  
|1| "18" |  0.004535027430895477  
|2| "737" |  0.003150401858160732  
|3| "118" |  0.0021219790121769088  
|4| "1719" |  0.002078180983269449  
|5| "136"  | 0.0019870699856579417  
|6| "790" | 0.0019688909929434454  
|7| "143" |  0.001956844315787098  
|8| "40" |  0.0018248551665789362  
|9| "1619" |  0.0015362428791861633  
|10| "725"  | 0.0014960008685869172  