Pregel
---

Pregel is ["the system that changed graph processing"](https://www.coursera.org/learn/graph-analytics/lecture/eVChr/pregel-the-system-that-changed-graph-processing). It's a *parallel-processing* system suitable for processing large graphs. It was described by several Google authors in a highly-cited 2010 paper ([Malewicz et al, 2010](http://dl.acm.org/citation.cfm?id=1807184)). The name *Pregel* is the name of the river that flows through Konigsberg (now Kaliningrad), the setting of the seminal graph theory problem, the Seven Bridges of Konigsberg.

Motivation
---

A lot of graph processing problems require using large matrix computations, such as matrix factorisation and matrix powers. For large graphs (large $n$ nodes), the size of a matrix is $n^2$, so this is a problem. Some graphs are also too large to represent on a single machine, regardless of what format we store them or what algorithms we want to run. Therefore, to keep scaling to larger graphs, we need to think about how to divide the work between multiple machines.

Batch synchronous parallel
---

The BSP paradigm (Valiant, 1990) is one approach to this, and *Pregel* is one implementation of BSP, with some extra ideas, and some extra engineering (eg fault tolerance). The basic idea in both BSP and Pregel, then, is *think like a vertex*. We have to rewrite our algorithm as if it was being executed by each vertex. Every vertex executes the same algorithm.

The model requires two main concepts: time-steps and messages. At every time-step (called *superstep*), the framework calls an *update* method on each vertex. The vertex reads any incoming messages from other vertices, runs its algorithm, and sends out messages. Messages can only go along the (directed) edges from a vertex to its neighbours. The framework is responsible for book-keeping the messages, the time-steps, and some other stuff. 

Given a lot of machines, we then allocate our graph's vertices approximately evenly between the machines. So, eg, we might have 1 million vertices per machine. We try to allocate in such a way that most of the edges are between vertices on the same machine -- so most messages go between vertices on the same machine -- so minimising inter-machine communication, which is much slower.




Threads and processes
---

Most modern computers have a CPU with multiple processors or *cores*. For example, my laptop is dual-core. The cores are in some ways independent. We get the most out of our machines when we write software to use all the cores at once: we say then that several tasks are running *in parallel*. Some types of software lend themselves naturally to this, and others don't. Some algorithms are *easy to parallelize*. (Another way to get the most out of the machine is run multiple programs at the same time.)

To be concrete: if we write (say) a for loop inside a for loop inside a for loop, with something like a multiplication in the inner loop, and each loop is repeating 1000 times, then this will chug away for a long time on *one processor*. The other processor will lie idle (actually probably doing some background operating system stuff, but not working on our for loop). 

To actually use more than one processor, we have to use a software framework designed for that. There are two main software concepts available to us for parallelisation: *threads* and *processes*. We won't go into the differences between them. The central concept is similar: a program starts up on a single processor, then calls some special functions to start up extra threads (or processes), and the operating system can then run them on other processors. The extra threads (or processes) can communicate with each other and the original in various ways. Then can also quit, and new ones can be started.

In Pregel, each machine runs multiple threads. In `pregel.py`, the single machine runs multiple threads. Each thread is responsible for running some vertices. The "main" program is responsible for book-keeping -- passing messages, etc.

`pregel.py`
---

A toy version of Pregel was implemented by Nielsen (https://github.com/mnielsen/Pregel). I have made a few small additions, and my version is available at https://github.com/jmmcd/Pregel. (This document lives there.) I'll describe the code as it exists in my repo, but be aware that most of it was written by Nielsen.

By "toy version", we mean that it illustrates (most of) the concepts, but it doesn't actually implement processing on multiple machines. It does implement parallel processing, using *threads*.

`pregel.py` provides two main things: a `Pregel` class, and a `Vertex` class. `Pregel` is the "main" piece of code. It does some initialisation, then it iterates: at each superstep, it passes messages, and then calls `Vertex.update` for each vertex. `Vertex` is a base class, which doesn't do much: the idea is that we write a subclass, inheriting from `Vertex`, specialising to whatever algorithm we actually want to run. In particular, we have to write the `update` method.

`pagerank.py`
---

The Pregel paper describes a few applications. The first one provided in the code is PageRank. Recall that mathematically, PageRank is described as a generalised eigenvector problem, which can be solved using power iteration or other methods. The initial idea is that a vertex $i$ has PageRank $r_i$ described by the sum of the PageRanks of vertices that point to $i$, but each one divided by its number of outgoing links:

$$r_i = \sum_{\mathrm{edges} \ ji} {r_j \over d_j}$$

The "damping" factor $\alpha$ changes it slightly:

$$r_i = {1-\alpha \over n} + \alpha \sum_{\mathrm{edges} \ ji} {r_j \over d_j}$$


Here, we solve it using Pregel. The part we need to understand is `PageRankVertex.update`. Remember, we have to "think like a vertex". At each superstep, "we" will receive a message from each vertex that links to us. Each message is of the format `(vertex, 1, pagerank)`. Here, `vertex` is a vertex id, and `1` is an edge weight (in PageRank, all edges are unweighted, so we'll use `1`. `pagerank` is that vertex's estimation of its own pagerank, *divided by* that vertex's number of outgoing edges. In other words, it is a current estimate of ${r_j \over d_j}$.

We sum over these incoming pagerank values and plug in to the formula to get an updated estimate for our own pagerank, which we store as `self.value` -- this variable is often used in `pregel.py` to store the current "state" of a vertex. 

We then send out messages along our outgoing edges, in the same format as before: `(vertex, 1, pagerank)`, where `vertex` is our id, `1` is the edge weight, and `pagerank` is our current estimate, divided by our number of outgoing edges.

All this happens for 50 supersteps. After that time, a node can (in Pregel terminology) "vote to halt", implemented as `self.active = False`.

We should observe that each vertex has only interacted with a small subset of the other vertices. No vertex is required to store all edges. There will be a lot of overhead in the communication here, but for many problems it will be a worthwhile trade-off.

`pagerank.py` runs Pregel/PageRank on a small randomly-generated graph. It also computes PageRank on the same graph using a linear algebra method, as a test. The result is pretty good, demonstrated when we see that the error between the vectors resulting from the `pregel.py` and the linear algebra implementations is small.


Exercises
---

1. Note that `pagerank.py` will divide by zero if any vertex has no outgoing edges. Think of a suitable fix for this.

2. Change the size of the random graph and/or the number of edges, to see whether it changes the error.

3. Try plugging in a NetworkX random graph generator (Erdos-Renyi, Watts-Strogatz, or Barabasi-Albert), instead of the simple graph generator used in `create_graph`, and again observing the effect on the error.

4. Extract $\alpha$ as a parameter `alpha = 0.85`, and remove the "hard-coded" constants 0.15 and 0.85. 

5. Then change $\alpha$ to different values (including extreme values 0 and 1) to see the effect on PageRank.

6. For (1) and (4), send me a pull request on Github.


Information flow
---

In this section we will simulate *information flow* in a network. Information flow refers to how a piece of information travels between actors (here, *actors* means people -- not movie stars -- but it could refer to companies, animals, or other agents). We are interested in questions like: how does the size of the network and its structure affect information flow?

The simulation should, of course, seek to model the behaviour of the real-world network we are interested in studying. We do the modelling at the level of nodes. That means we don't write a global, top-down model. Instead we try to express, in code, the behaviour of a single node. 

Our initial scenario is very simple: there is some piece of news, initially known to only one actor. At the first time step, it passes it on to all its friends. At each time step, anyone who hears the news passes it on immediately. We will implement this first, then think about extensions later.

We represent this in Pregel as follows. Each node has a state representing whether or not it has heard the news. We could use a Boolean for this. But we will wish to see, during and after the simulation, what's happening in the simulation. So we will represent the state as an integer. -1 will indicate the node does not yet know the news, and any non-negative integer will represent the time-step at which the node heard the news.

Note: In Python 2, `print` is a statement, ie we write `print s`. If we write `print(s)`, which looks like a function call, Python 2 will interpret that as `print (s)`, ie `(s)` is just `s` in brackets, which have no effect. If we write `print(s, t)`, that will be interpreted as `print (s, t)`, and the `(s, t)` will be interpreted as a *tuple*. 

This changes in Python 3. In Python 3, `print` must be written as a function, ie `print(s)`, not a statement. `print s` is a SyntaxError.

Python 2 provides a strange-looking import from `__future__`, as you can see below. It changes Python 2 to behave like Python 3, in this respect. If we run the same import in Python 3, it just does nothing.

I sometimes write Python 2 and sometimes Python 3, but I'm mostly moving to 3. In order to force myself to write  code which will run in either 2 or 3, I use this import. 

In [1]:
from __future__ import print_function
from pregel import Vertex, Pregel
import random
import networkx as nx

Let's create a network of 100 nodes with 7 out-edges each. Pregel uses a directed graph, so to model a symmetric relationship (*A* is a friend of *B* implies *B* is a friend of *A*) we create two directed edges for every relationship.

In [10]:
n = 100 # number of nodes
m = 7 # number of edges per node


def create_graph():
    G = nx.random_graphs.connected_watts_strogatz_graph(n, m, 0.1)
    vertices = [InformationFlowVertex(j, -1, [], []) 
                for j in range(n)]
    for u, v in G.edges():
        vertices[u].out_vertices.append(vertices[v])
        vertices[v].out_vertices.append(vertices[u])
    return vertices


    

Now let's think like a vertex. How should we behave? If we get an incoming message, that means we now know the knowledge, so we can change our state. We can also send it out to all our friends. We can then vote to halt, which also means we'll go to sleep.

We need a little more though. We have to initialise

In [11]:
class InformationFlowVertex(Vertex):
    def update(self):
        
        if self.superstep == 0 and self.id == 0:
            # special case: we are the first node to know
            self.value = 0
            self.outgoing_messages = [(v,1,True) for v in self.out_vertices]
                
        else:
            # typical case
            if self.value < 0 and len(self.incoming_messages):
                self.value = self.superstep # we learned the information at this time-step
                # if we've just heard, then tell our friends, and
                # then stop processing
                self.outgoing_messages = [(v,1,True) for v in self.out_vertices]

        # regardless of what has happened, we have no more work to do for now 
        self.active = False

In [12]:
def information_flow_pregel(vertices):
    """Simulates information flow dynamics using Pregel."""
    p = Pregel(vertices, stats_fn=stats)
    p.run()

In [13]:
nsteps = n # how many time-steps to run: this is a max

What information would we like to see, while running, or afterward? Let's just record the time-step, the proportion of nodes which know the news, and the proportion which are active. These two numbers will sum to 1, but conceptually they are distinct (in later models they might not sum to 1) so we'll calculate them explicitly.

In [14]:
def stats(vertices):
    superstep = vertices[0].superstep
    n = len(vertices)
    # what proportion of vertices know the piece of news?
    pknow = sum(v.value >= 0 for v in vertices) / float(n)
    pactive = sum(v.active for v in vertices) / float(n)
    return superstep, pknow, pactive

In [None]:
def main():
    vertices = create_graph()
    information_flow_pregel(vertices)
    # at the end, print out each node's id and the time-step at which it heard
    for v in vertices:
        print(v.id, v.value)
    

In [16]:
main()

(0, 0.01, 0.0)
(1, 0.07, 0.0)
(2, 0.19, 0.0)
(3, 0.44, 0.0)
(4, 0.81, 0.0)
(5, 1.0, 0.0)
(6, 1.0, 0.0)
0 0
1 1
2 1
3 1
4 2
5 2
6 2
7 3
8 3
9 3
10 4
11 4
12 3
13 4
14 4
15 3
16 3
17 3
18 2
19 3
20 3
21 3
22 2
23 4
24 2
25 3
26 3
27 2
28 3
29 3
30 2
31 4
32 3
33 3
34 4
35 4
36 4
37 4
38 3
39 4
40 4
41 2
42 3
43 3
44 4
45 4
46 4
47 4
48 5
49 4
50 3
51 4
52 4
53 4
54 4
55 5
56 5
57 5
58 5
59 5
60 4
61 4
62 5
63 5
64 5
65 5
66 4
67 4
68 4
69 3
70 4
71 5
72 4
73 5
74 4
75 5
76 5
77 5
78 4
79 4
80 4
81 3
82 4
83 5
84 5
85 5
86 4
87 5
88 4
89 4
90 4
91 3
92 3
93 3
94 2
95 2
96 2
97 1
98 1
99 1


Exercises
---

1. The above simple model effectively duplicates the shortest path computation in an unweighted, undirected network. So, it's not too interesting. What about if there is instead a *probability* of passing a piece of information on to each neighbour at each time-step? We would decide whether to pass it on or not. We wouldn't just go immediately to sleep, unless we were sure we had passed it on to all of our neighbours.

Epidemic models
---

We have mentioned that epidemic models are one useful real-world application of social network analytics. Here's a simple example of epidemic modelling.

It is similar to the information-flow model. Each node has a state, which is
0 for normal nodes, or >0 if infected, with the integer value giving the
time since infection, or -1 for dead nodes. A node dies after an
incubation period $t_i$. An infected node, while alive, has a probability $p$ of infecting each of its neighbours. We model this as an outgoing message. 

For 1000 vertices, p=0.01, i=1, ti=10, I find that somewhere between
m=10 and m=20 there is a tipping point. Below, we get a small
die-off. Above, almost everyone usually dies.


In [23]:
# try other values of n and m
n = 1000 # number of nodes
m = 20 # number of edges per node
p = 0.01 # probability of acquiring infection from a single neighbour
i = 1 # number of nodes initially infected
ti = 10 # incubation time: t time-steps after infection, the individual dies
nsteps = num_vertices # how many time-steps to run

def epidemic_main():
    vertices = [EpidemicVertex(j, [], [], [])
                for j in range(n)]
    create_edges(vertices)
    epidemic_pregel(vertices)

def create_edges(vertices):
    """Generates some randomly chosen in- and out-going edges from each
    vertex in vertices with random weights.
    """
    for v in vertices:
        v.out_vertices = random.sample(vertices, m)

def epidemic_stats(vertices):
    superstep = vertices[0].superstep
    n = len(vertices)
    palive = sum(v.value >= 0 for v in vertices) / float(n)
    pinfected = sum(v.value > 0 for v in vertices) / float(n)
    return superstep, palive, pinfected

def epidemic_pregel(vertices):
    """Simulates an epidemic using Pregel."""
    p = Pregel(vertices,stats_fn=epidemic_stats)
    p.run()

class EpidemicVertex(Vertex):
    def update(self):
        if self.superstep == 0:
            # initialise
            if self.id == 0:
                # "patient zero"
                self.value = 1
            else:
                # healthy
                self.value = 0

        elif self.superstep < nsteps:
            if self.value == -1:
                # stop stop he's already dead
                self.active = False

            elif self.value > 0:
                # we are infected. no need to read incoming messages
                if self.value < ti:
                    # stay infected
                    self.value += 1
                    # emit infection
                    self.outgoing_messages = [(v,1,1) for v in self.out_vertices]

                else:
                    # die
                    self.value = -1
                    self.active = False

            else:
                # we are not infected, so check incoming
                for v, w, msg in self.incoming_messages:
                    # every incoming message has a probability p of infecting this vertex
                    if random.random() < p:
                        self.value = 1

        else:
            self.active = False

In [24]:
epidemic_main()

(0, 1.0, 0.001)
(1, 1.0, 0.001)
(2, 1.0, 0.002)
(3, 1.0, 0.002)
(4, 1.0, 0.002)
(5, 1.0, 0.002)
(6, 1.0, 0.002)
(7, 1.0, 0.002)
(8, 1.0, 0.003)
(9, 1.0, 0.003)
(10, 0.999, 0.003)
(11, 0.999, 0.004)
(12, 0.998, 0.006)
(13, 0.998, 0.006)
(14, 0.998, 0.006)
(15, 0.998, 0.007)
(16, 0.998, 0.007)
(17, 0.998, 0.007)
(18, 0.997, 0.006)
(19, 0.997, 0.008)
(20, 0.996, 0.01)
(21, 0.995, 0.01)
(22, 0.992, 0.008)
(23, 0.992, 0.008)
(24, 0.992, 0.012)
(25, 0.991, 0.012)
(26, 0.991, 0.014)
(27, 0.991, 0.015)
(28, 0.991, 0.016)
(29, 0.989, 0.015)
(30, 0.986, 0.014)
(31, 0.985, 0.018)
(32, 0.984, 0.018)
(33, 0.984, 0.024)
(34, 0.98, 0.024)
(35, 0.979, 0.028)
(36, 0.977, 0.031)
(37, 0.976, 0.035)
(38, 0.975, 0.038)
(39, 0.974, 0.04)
(40, 0.972, 0.051)
(41, 0.967, 0.053)
(42, 0.966, 0.063)
(43, 0.96, 0.065)
(44, 0.956, 0.07)
(45, 0.951, 0.074)
(46, 0.946, 0.084)
(47, 0.941, 0.086)
(48, 0.937, 0.097)
(49, 0.934, 0.114)
(50, 0.921, 0.111)
(51, 0.914, 0.119)
(52, 0.903, 0.128)
(53, 0.895, 0.139)
(54, 0.886

Exercise
---

1. Implement another algorithm from the Pregel paper: semi-clustering (difficult).

Next time, we'll look at a model of how social networks like Facebook grow or shrink over time in competition with each other.