Skip to content

Latest commit

 

History

History
264 lines (228 loc) · 13.1 KB

README.md

File metadata and controls

264 lines (228 loc) · 13.1 KB

Tutorial of Rabit

This is an tutorial of rabit, a Reliable Allreduce and Broadcast interface. To run the examples locally, you will need to type make to build all the examples.

Please also refer to the API Documentation

List of Topics

What is Allreduce

The main method provided by rabit are Allreduce and Broadcast. Allreduce performs reduction across different computation nodes, and returning the results to all the nodes. To understand the behavior of the function. Consider the following example in basic.cc.

#include <rabit.h>
using namespace rabit;
const int N = 3;
int main(int argc, char *argv[]) {
  int a[N];
  rabit::Init(argc, argv);
  for (int i = 0; i < N; ++i) {
    a[i] = rabit::GetRank() + i;
  } 
  printf("@node[%d] before-allreduce: a={%d, %d, %d}\n",
         rabit::GetRank(), a[0], a[1], a[2]);
  // allreduce take max of each elements in all processes
  Allreduce<op::Max>(&a[0], N);
  printf("@node[%d] after-allreduce: a={%d, %d, %d}\n",
         rabit::GetRank(), a[0], a[1], a[2]);
  rabit::Finalize();
  return 0;
}

You can run the example using the rabit_demo.py script. The following commmand start rabit program with two worker processes.

../tracker/rabit_demo.py -n 2 basic.rabit

This will start two process, one process with rank 0 and another rank 1, running the same code. The rabit::GetRank() function return the rank of current process.

Before the call the allreduce, process 0 contains array a = {0, 1, 2}, while process 1 have array a = {1, 2, 3}. After the call of Allreduce, the array contents in all processes are replaced by the reduction result (in this case, the maximum value in each position across all the processes). So after the Allreduce call, the result will become a = {1, 2, 3}. Rabit provides different reduction operators, for example, you can change op::Max to op::Sum, then the reduction operation will become the summation, and the result will become a = {1, 3, 5}. You can also run example with different processes by setting -n to different values, to see the outcomming result.

Broadcast is another method provided by rabit besides Allreduce, this function allows one node to broadcast its local data to all the other nodes. The following code in broadcast.cc broadcast a string from node 0 to all other nodes.

#include <rabit.h>
using namespace rabit;
const int N = 3;
int main(int argc, char *argv[]) {
  rabit::Init(argc, argv);
  std::string s;
  if (rabit::GetRank() == 0) s = "hello world";
  printf("@node[%d] before-broadcast: s=\"%s\"\n",
         rabit::GetRank(), s.c_str());
  // broadcast s from node 0 to all other nodes
  rabit::Broadcast(&s, 0);
  printf("@node[%d] after-broadcast: s=\"%s\"\n",
         rabit::GetRank(), s.c_str());
  rabit::Finalize();
  return 0;
}

You can run the program by the following command, using three workers.

../tracker/rabit_demo.py -n 3 broadcast.rabit

Besides string, rabit also allows broadcast of constant size array and vector.

Common Use Case

Many distributed machine learning algorithm involves dividing the data into each node, compute statistics locally and aggregates them together. Such process is usually done repeatively in many iterations before the algorithm converge. Allreduce naturally meets the need of such programs, common use cases include:

  • Aggregation of gradient values, which can be used in optimization methods such as L-BFGS.
  • Aggregation of other statistics, which can be used in KMeans and Gaussian Mixture Model.
  • Find the best split candidate and aggregation of split statistics, used for tree based models.

The main purpose of Rabit is to provide reliable and portable library for distributed machine learning programs. So that the program can be run reliably on different types of platforms.

Structure of Rabit Program

The following code illustrates the common structure of rabit program. This is an abstract example, you can also refer to kmeans.cc for an example implementation of kmeans.

#include <rabit.h>
int main(int argc, char *argv[]) {
  ...
  rabit::Init(argc, argv);
  // load the latest checked model
  int version = rabit::LoadCheckPoint(&model);
  // initialize the model if it is the first version
  if (version == 0) model.InitModel();
  // the version number marks the iteration to resume
  for (int iter = version; iter < max_iter; ++iter) {
    // model should be sufficient variable at this point
    ...
    // each iteration can contain multiple calls of allreduce/broadcast
    rabit::Allreduce(&data[0], n);
    ...
    // checkpoint model after one iteration finishes
    rabit::CheckPoint(&model);
  }
  rabit::Finalize();
  return 0;
}

Besides the common Allreduce and Broadcast function, there are two additional functions: CheckPoint and CheckPoint. These two functions are used for fault-tolerance purpose. Common machine learning programs involves several iterations. In each iteration, we start from a model, do some calls to Allreduce or Broadcasts and update the model to a new one. The calling sequence in each iteration does not need to be the same.

  • When the nodes start from beginning, LoadCheckPoint returns 0, and we can initialize the model.
  • CheckPoint saves the model after each iteration.
    • Efficiency Note: the model is only kept in local memory and no save to disk is involved in Checkpoint
  • When a node goes down and restarts, LoadCheckPoint will recover the latest saved model, and
  • When a node goes down, the rest of the node will block in the call of Allreduce/Broadcast and helps the recovery of the failure nodes, util it catches up.

Please also see the section of fault tolerance procedure in rabit to understand the recovery procedure under going in rabit

Compile Programs with Rabit

Rabit is a portable library, to use it, you only need to include the rabit header file.

  • You will need to add path to ../include to the header search path of compiler
    • Solution 1: add -I/path/to/rabit/include to the compiler flag in gcc or clang
    • Solution 2: add the path to enviroment variable CPLUS_INCLUDE_PATH
  • You will need to add path to ../lib to the library search path of compiler
    • Solution 1: add -L/path/to/rabit/lib to the linker flag
    • Solution 2: add the path to enviroment variable LIBRARY_PATH AND LD_LIBRARY_PATH
  • Link against lib/rabit.a
    • Add -lrabit to linker flag

The procedure above allows you to compile a program with rabit. The following two sections are additional advanced options you can take to link against different backend other than the normal one.

Link against MPI Allreduce

You can link against rabit_mpi.a instead to use MPI Allreduce, however, the resulting program is backed by MPI and is not fault tolerant anymore.

  • Simply change linker flag from -lrabit to -lrabit_mpi
  • The final linking needs to be done by mpi wrapper compiler mpicxx

Link against Mock Test Rabit Library

If you want to mock test the program to see the behavior of the code when some nodes goes down. You can link against rabit_mock.a .

  • Simply change linker flag from -lrabit to -lrabit_mock

The resulting rabit program can take in additional arguments in format of

mock=rank,version,seq,ndeath 

The four integers specifies an event that will cause the program to suicide(exit with -2)

  • rank specifies the rank of the node
  • version specifies the current version(iteration) of the model
  • seq specifies the sequence number of Allreduce/Broadcast call since last checkpoint
  • ndeath specifies how many times this node died already

For example, consider the following script in the test case

../tracker/rabit_demo.py -n 10 test_model_recover 10000\
                         mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1
  • The first mock will cause node 0 to exit when calling second Allreduce/Broadcast (seq = 1) in iteration 0
  • The second mock will cause node 1 to exit when calling second Allreduce/Broadcast (seq = 1) in iteration 1
  • The second mock will cause node 0 to exit again when calling second Allreduce/Broadcast (seq = 1) in iteration 1
    • Note that ndeath = 1 means this will happen only if node 0 died once, which is our case

Running Rabit Jobs

Rabit is a portable library that can run on multiple platforms.

Running Rabit Locally

Running Rabit on Hadoop

  • You can use ../tracker/rabit_hadoop.py to run rabit program on hadoop
  • This will start n rabit program as mapper of MapReduce
  • Each program can read its part of data from stdin
  • Yarn is highly recommended, since Yarn allows specifying ncpu and memory of each mapper
    • This allows multi-threading programs in each node, which can be more efficient
    • A good possible practice is OpenMP-rabit hybrid code

Running Rabit on Yarn

Running Rabit using MPI

  • You can submit rabit programs to MPI cluster using ../tracker/rabit_mpi.py.
  • If you linked your code against librabit_mpi.a, then you can directly use mpirun to submit the job

Customize Tracker Script

You can also modify the tracker script to allow rabit run on other platforms. To do so, refer to the existing tracker script such as ../tracker/rabit_hadoop.py and ../tracker/rabit_mpi.py

You will need to implement a platform dependent submission function with the following definition

def fun_submit(nslave, slave_args):
    """
      customized submit script, that submit nslave jobs,
      each must contain args as parameter
      note this can be a lambda closure
      Parameters
         nslave number of slave process to start up
         worker_args tracker information which must be passed to the arguments 
              this usually includes the parameters of master_uri and port etc.
    """

The submission function should start nslave process in the platform, and append slave_args to the end of other arguments. Then we can simply call tracker.submit with fun_submit to submit jobs in the target platform

Note that the current rabit tracker do not restart a worker when it dies, the job of fail-restart thus lies on the platform itself or we should write fail-restart logic in the customization script.

  • Fail-restart is usually provided by most platforms.
  • For example, mapreduce will restart a mapper when it fails

Fault Tolerance

This section introduces the how fault tolerance works in rabit. We can use the following figure to show the how rabit deals with failures.

The scenario is as follows:

  • Node 1 fails between the first and second call of Allreduce after the latest checkpoint
  • Other nodes stay in the call of second Allreduce to help node 1 to recover.
  • When node 1 restarts, it will call LoadCheckPoint, and get the latest checkpoint from one of the existing nodes.
  • Then node 1 can start from the latest checkpoint and continue running.
  • When node 1 call the first Allreduce again, because the other nodes already knows the result of allreduce, node 1 can get the result from one of the nodes.
  • When node 1 reaches the second Allreduce, other nodes find out that node 1 has catched up and they can continue the program normally.

We can find that this fault tolerance model is based on the a key property of Allreduce and Broadcast: All the nodes get the same result when calling Allreduce/Broadcast. Because of this property, we can have some node records the history, and when a node recovers, the result can be forwarded to the recovering node.

The checkpoint is introduced so that we do not have to discard the history before the checkpoint, so that the iterative program can be more efficient. The strategy of rabit is different from fail-restart strategy where all the nodes restarts from checkpoint when any of the node fails. All the program only block in the Allreduce call to help the recovery, and the checkpoint is only saved locally without touching the disk. This makes rabit program more reliable and efficient.

This is an conceptual introduction to the fault tolerant model of rabit. The actual implementation is more sophiscated, and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.