No description, website, or topics provided.
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.
Assignment problem
TSP Problem

Case studies of Pregel algorithms

This repository collects examples for Pregel-like graph processing frameworks.


Given a weighted undirected graph with integer capacity b(v) assigned to each vertex v, the Maximum B-matching Problem is to select a subgraph of maximum weight such that the degree of each vertex in the subgraph does not exceed its capacity. A greedy algorithm described in Social Content Matching in MapReduce (PVLDB'11) provides a 1/2-approximation guarantee for this problem.

The Okapi library implements a parallel version of the above algorithm in Giraph. The computation works as follows. Each vertex v maintains a capacity c(v), which is b(v) in the beginning. At each superstep, each vertex v i) proposes its c(v) edges with maximum weight to its neighbors, and ii) computes the intersection between its own proposals and the proposals of its neighbors from the last superstep. The size of this intersection is subtracted from the capacity c(v). If c(v) becomes zero, vertex v is removed from the graph.

Assignment Problem

The assignment problem is concerned with finding a maximum matching in a weighted bipartite graph. This project implements Bertsekas' auction algorithm for the assignment problem on top of Giraph.

Connected Components

The appendix of Pregel Algorithms for Graph Connectivity Problems with Performance Guarantees (VLDB'14) describes a Pregel implementation of the Shiloach-Vishkin algorithm that finds connected components in a graph.

Graph Partitioning

The Okapi library implements the Spinner graph partitioning algorithm, which computes an edge-based balanced k-way partitioning of a graph. Spinner splits the vertices across k partitions, trying to maximize locality and balancing. The former means that vertices that are connected through an edge tend to be assigned to the same partition. The latter means that a similar number of edges is assigned to each partition. The partitioning produced by Spinner can be used to minimize network communication and maximize load balance for distributed computation frameworks.

Spinner is based on iterative vertex migrations, with decisions taken independently based on per-vertex local information. In the beginning, vertices are assigned randomly to partitions. After initialization, vertices communicate their partition to their neighbors through messages, and they try to migrate to the partition where most of their neighbors occur. This pattern is also known as label propagation. Spinner supports a number of interesting features such as incremental computation and ability to adapt a partitioning to changes to the graph (e.g. vertices and edges are added or removed) and to the partitions (e.g. partitions are added or removed). These features allow Spinner to partition massive graphs and keep the partitioning up-to-date with minimal re-computations.

The computation of the Spinner algorithm works as follows. The algorithm first assigns vertices to partitions according to the following heuristics:

  • If we are computing a new partitioning, assign the vertex to a random partition.
  • If we are adapting the partitioning to changes of graph:
    a) If the vertex was partitioned before, assign it to the previous partition.
    b) If the vertex is new, assign it to a random partition.
  • If we are adapting the graph to changes of partitioning:
    a) If we are going to add partitions, assign the vertex to the new partitions with probability p (see paper).
    b) If we are going to remove partitions, assign vertices belonging to the removed partitions to the other partitions uniformly at random.

After the vertices are initialized, they communicate their labels to their neighbors, and update the partition loads according to their assignments.

  1. Each vertex computes the score for each label based on loads and the labels from incoming neighbors. If a new partition has higher score, the vertex will try to migrate to the partition in next superstep. Otherwise, it does nothing.
  2. Interested vertices try to migrate according to the ratio of the vertices trying to migrate and the remaining capacity of the target partition. Vertices that succeed in the migration update the partition loads and communicate their labels to their neighbors.

The algorithm keeps alternating the above two steps until it converges. Convergence is reached when the global score of the partitioning does not change for a number of times over a certain threshold.


The k-core of a graph is the subgraph in which all vertices have degree of at least k. The Okapi library implements a k-core finding algorithm described in Using Pregel-like Large Scale Graph Processing Frameworks for Social Network Analysis (ASONAM '12). The algorithm iteratively removes vertices with degrees less than k, and stops when there is no vertex to remove. At the end of the execution, the remaining graph represents the k-core. It is possible that the result is an empty graph.

Social Capital Value

This project provides a Giraph implementation of the algorithm presented in Social Capital: The Power of Influencers in Networks (AAMAS'13) to compute the social capital values for nodes in social networks.

Frequent Pattern Mining

This project provides a Giraph implementation of the APriori algorithm for frequent pattern mining. The Spark MLlib implements some frequent pattern mining algorithms without using GraphX.

Facility Location

This project provides a Giraph implementation of the Facility Location Algorithm described in Scalable Facility Location for Massive Graphs on Pregel-like Systems (CIKM'15).

Graph Drawing

This project implements a distributed force-directed algorithm in Giraph. The algorithm is described in A Distributed Force-Directed Algorithm on Giraph.


A cluster is formed by vertices that have stronger connections with each other compared to other vertices. A semi-cluster differs from a typical cluster in that it allows a vertex to exist in more than one cluster. The Okapi library implements a semi-clustering algorithm described in the Pregel paper.

The logic behind the algorithm lies in the calculation of a score for each semi-cluster and the dismissal of the semi-clusters with the lowest scores. Each vertex maintains a list of semi-clusters to which it belongs. The list is bounded in length and sorted by score. The score of a semi-cluster is computed by S = (I-f*B)/(V*(V-1)/2), where I is the sum of weights of all internal edges, B is the sum of weights of all boundary edges, V is the number of vertices in the semi-cluster, and f is a user-specified boundary edge score factor with a value between 0 and 1. In each superstep, every vertex

  1. Receives messages which are semi-clusters created in previous supersteps.
  2. For each message, it checks whether it is included in the message/clusters. If not, it adds itself to the semi-clusters and calculates the scores for the new clusters. In any moment, a vertex belongs to a bounded number of semi-clusters. Hence, if a new semi-cluster has a higher score than the last semi-cluster in the list, then the latter gets discarded. Note that a vertex cannot add itself to a semi-cluster if the size of the semi-cluster reaches its maximum, which is defined by the user.
  3. It sends a message with its updated list of semi-clusters.

For the first superstep, each vertex creates an empty cluster and adds itself to it. Thus the first set of messages sent in the network contain the vertices themselves. The algorithm finishes when the semi-cluster lists don't change or after a maximum number of iterations.

BTG Computation

This project provides a Giraph implementation of the algorithm described in BIIIG: Enabling Business Intelligence with Integrated Instance Graphs, which extracts Business Transactions Graphs (BTGs) from an Integrated Instance Graph (IIG). An IIG contains nodes belonging to two different classes: master data and transactional data. A BTG is a sub-graph of an IIG which has only master data nodes as boundary nodes and transactional data nodes as inner nodes. In the business domain, a BTG describes a specific case inside a set of business cases involving master data like Employees, Customers and Products and transactional data like SalesOrders, ProductOffers or Purchases.

The algorithm finds all BTGs inside a given IIG. The idea is to find connected components by communicating the minimum vertex id inside a connected sub-graph and storing it. The minimum id inside a sub-graph is the BTG id. Only transactional data nodes are allowed to send ids, so the master data nodes work as a communication barrier between BTGs. The master data nodes receive messages from transactional data nodes out of BTGs in which they are involved. They store the minimum incoming BTG id by vertex id in a map and when the algorithm terminates, the set of unique values inside this map is the set of BTG ids the master data node is involved in.

Label Propagation

Label Propagation is used to detect communities inside a graph by propagating vertex labels (communities). Each vertex stores a unique id, a value (its label) and its outgoing edges. The label represents the community that the vertex belongs to. Vertices migrate to the community represented by the majority of labels sent by its neighbors.

The Giraph implementation of the algorithm can be sketched as follows. In superstep 0, each vertex propagates its initial value to its neighbors. In the remaining supersteps, each vertex will adopt the value of the majority of their neighbors or the smallest one if there is only one neighbor. If a vertex adopts a new value, it will propagate the new value to its neighbors. The computation will terminate if no new values are assigned or the maximum number of iterations is reached. The implementation adds a stabilization mechanism and avoids oscillating states.

Adaptive Repartitoning

This project implements the Adaptive Repartitioning (ARP) algorithm as described in Adaptive Partitioning of Large-Scale Dynamic Graphs (ICDCS'14) to partition a graph using label propagation. The implementation exploits Giraph aggregators to share global knowledge about the partition load and demand. For each of the k partitions, it uses two aggregators: The first aggregator stores the capacity (CA_i) of partition i, the second stores the demand (DA_i) for that partition (= number of vertices that want to migrate in the next superstep).

Phase 0 : Initialization Phase

If the input graph is an unpartitioned graph, the algorithm will at first initialize each vertex with a partition-id i (hash based). This is skipped if the graph is already partitioned. After that, each vertex will notify CA_i that it is currently a member of the partition and propagates the partition-id to its neighbors.

The main computation is divided in two phases:

Phase 1 : Demand Phase (odd-number supersteps)

Based on the information about their neighbors, each vertex calculates its desired partition, which is the most frequent one among the neighbors (label propagation). If the desired partition and the actual partition are not equal the vertex will notify the DA of the desired partition that the vertex want to migrate in.

Phase 2 : Migration Phase (even-number supersteps)

Based on the information of the first phase, the algorithm will calculate which vertices are allowed to migrate in their desired partition. If a vertex migrates to another partition it notifies the new and old CA.

The computation will terminate if no vertex wants to migrate, the maximum number of iterations (configurable) is reached or each vertex reaches the maximum number of partition switches (configurable).

Community Detection

This project implements Girvan-Newman's community detection algorithm in Giraph.

Graph Isomorphism

This project implements a brute-force graph isomorphism algorithm in Giraph.

Travelling Salesman Problem

This project provides a brute-force Giraph algorithm for the Travelling Salesman Problem.

Diffusive Clustering

This project implements the [Distributed Diffusive Clustering Algorithm](http:// in Giraph.

Minimum Spanning Tree

This paper describes a distributed MST algorithm on top of the GPS framework. However, the algorithm needs mater computation, global aggregators and mutation of graph. It is not easy to implement the algorithm in Spark Pregel.

Materials and references

  1. Malewicz, Grzegorz, et al. Pregel: a system for large-scale graph processing. ACM SIGMOD 2010. (digest)
  2. Xin, Reynold S., et al. GraphX: Unifying data-parallel and graph-parallel analytics. arXiv preprint (2014). (digest)
  3. Gonzalez, Joseph E., et al. Graphx: Graph processing in a distributed dataflow framework. OSDI 2014.
  4. Avery Ching. Giraph: Large-scale graph processing infrastructure on hadoop. Proceedings of the Hadoop Summit 2011.
  5. Ching, Sergey E., et al. One Trillion Edges: Graph Processing at Facebook-Scale. PVLDB 2015.
  6. Zhang, Hao, et al. In-memory big data management and processing: A survey. IEEE Transactions on Knowledge and Data Engineering 2015.
  7. Han, Minyang, et al. An experimental comparison of pregel-like graph processing systems. PVLDB 2014.
  8. Salihoglu, Semih, et al. Optimizing graph algorithms on pregel-like systems. PVLDB 2014.
  9. McCune, Robert R., et al. Thinking like a vertex: a survey of vertex-centric frameworks for large-scale distributed graph processing. ACM CSUR 2015.
  10. Yan, Da, et al. Pregel algorithms for graph connectivity problems with performance guarantees. PVLDB 2014.