Skip to content

Commit

Permalink
Merge pull request #2903 from ronghanghu/multi_gpu
Browse files Browse the repository at this point in the history
Multi-GPU Data Parallelism
  • Loading branch information
ronghanghu committed Aug 13, 2015
2 parents 8181870 + 6b50ed6 commit bb0a90e
Show file tree
Hide file tree
Showing 35 changed files with 1,769 additions and 312 deletions.
7 changes: 7 additions & 0 deletions docs/tutorial/interfaces.md
Expand Up @@ -50,6 +50,13 @@ For a full example of fine-tuning, see examples/finetuning_on_flickr_style, but
# query the first device
caffe device_query -gpu 0

**Parallelism**: the `-gpu` flag to the `caffe` tool can take a comma separated list of IDs to run on multiple GPUs. A solver and net will be instantiated for each GPU so the batch size is effectively multiplied by the number of GPUs. To reproduce single GPU training, reduce the batch size in the network definition accordingly.

# train on GPUs 0 & 1 (doubling the batch size)
caffe train -solver examples/mnist/lenet_solver.prototxt -gpu 0,1
# train on all GPUs (multiplying batch size by number of devices)
caffe train -solver examples/mnist/lenet_solver.prototxt -gpu all

## Python

The Python interface -- pycaffe -- is the `caffe` module and its scripts in caffe/python. `import caffe` to load models, do forward and backward, handle IO, visualize networks, and even instrument model solving. All model data, derivatives, and parameters are exposed for reading and writing.
Expand Down
1 change: 1 addition & 0 deletions include/caffe/caffe.hpp
Expand Up @@ -10,6 +10,7 @@
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
#include "caffe/net.hpp"
#include "caffe/parallel.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/solver.hpp"
#include "caffe/util/benchmark.hpp"
Expand Down
20 changes: 13 additions & 7 deletions include/caffe/common.hpp
Expand Up @@ -98,12 +98,12 @@ void GlobalInit(int* pargc, char*** pargv);
class Caffe {
public:
~Caffe();
inline static Caffe& Get() {
if (!singleton_.get()) {
singleton_.reset(new Caffe());
}
return *singleton_;
}

// Thread local context for Caffe. Moved to common.cpp instead of
// including boost/thread.hpp to avoid a boost/NVCC issues (#1009, #1010)
// on OSX. Also fails on Linux with CUDA 7.0.18.
static Caffe& Get();

enum Brew { CPU, GPU };

// This random number generator facade hides boost and CUDA rng
Expand Down Expand Up @@ -149,6 +149,11 @@ class Caffe {
static void SetDevice(const int device_id);
// Prints the current GPU status.
static void DeviceQuery();
// Parallel training info
inline static int solver_count() { return Get().solver_count_; }
inline static void set_solver_count(int val) { Get().solver_count_ = val; }
inline static bool root_solver() { return Get().root_solver_; }
inline static void set_root_solver(bool val) { Get().root_solver_ = val; }

protected:
#ifndef CPU_ONLY
Expand All @@ -158,7 +163,8 @@ class Caffe {
shared_ptr<RNG> random_generator_;

Brew mode_;
static shared_ptr<Caffe> singleton_;
int solver_count_;
bool root_solver_;

private:
// The private constructor to avoid duplicate instantiation.
Expand Down
50 changes: 33 additions & 17 deletions include/caffe/data_layers.hpp
Expand Up @@ -5,16 +5,17 @@
#include <utility>
#include <vector>

#include "boost/scoped_ptr.hpp"
#include "hdf5.h"

#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {
Expand All @@ -33,6 +34,8 @@ class BaseDataLayer : public Layer<Dtype> {
// This method may not be overridden except by the BasePrefetchingDataLayer.
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
// Data layers have no bottoms, so reshaping is trivial.
Expand All @@ -50,12 +53,17 @@ class BaseDataLayer : public Layer<Dtype> {
bool output_labels_;
};

template <typename Dtype>
class Batch {
public:
Blob<Dtype> data_, label_;
};

template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
explicit BasePrefetchingDataLayer(const LayerParameter& param)
: BaseDataLayer<Dtype>(param) {}
explicit BasePrefetchingDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden.
Expand All @@ -67,36 +75,38 @@ class BasePrefetchingDataLayer :
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
// The thread's function
virtual void InternalThreadEntry() {}
// Prefetches batches (asynchronously if to GPU memory)
static const int PREFETCH_COUNT = 3;

protected:
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch) = 0;

Batch<Dtype> prefetch_[PREFETCH_COUNT];
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;

Blob<Dtype> transformed_data_;
};

template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
explicit DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param) {}
explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

// DataLayer uses DataReader instead for sharing for parallelism
virtual inline bool ShareInParallel() const { return false; }
virtual inline const char* type() const { return "Data"; }
virtual inline int ExactNumBottomBlobs() const { return 0; }
virtual inline int MinTopBlobs() const { return 1; }
virtual inline int MaxTopBlobs() const { return 2; }

protected:
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;
DataReader reader_;
};

/**
Expand All @@ -111,6 +121,8 @@ class DummyDataLayer : public Layer<Dtype> {
: Layer<Dtype>(param) {}
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
Expand Down Expand Up @@ -144,6 +156,8 @@ class HDF5DataLayer : public Layer<Dtype> {
virtual ~HDF5DataLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
Expand Down Expand Up @@ -185,6 +199,8 @@ class HDF5OutputLayer : public Layer<Dtype> {
virtual ~HDF5OutputLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
Expand Down Expand Up @@ -235,7 +251,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
shared_ptr<Caffe::RNG> prefetch_rng_;
virtual void ShuffleImages();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

vector<std::pair<std::string, int> > lines_;
int lines_id_;
Expand Down Expand Up @@ -307,7 +323,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer<Dtype> {

protected:
virtual unsigned int PrefetchRand();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, vector<int> > > image_database_;
Expand Down
82 changes: 82 additions & 0 deletions include/caffe/data_reader.hpp
@@ -0,0 +1,82 @@
#ifndef CAFFE_DATA_READER_HPP_
#define CAFFE_DATA_READER_HPP_

#include <map>
#include <string>
#include <vector>

#include "caffe/common.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {

/**
* @brief Reads data from a source to queues available to data layers.
* A single reading thread is created per source, even if multiple solvers
* are running in parallel, e.g. for multi-GPU training. This makes sure
* databases are read sequentially, and that each solver accesses a different
* subset of the database. Data is distributed to solvers in a round-robin
* way to keep parallel training deterministic.
*/
class DataReader {
public:
explicit DataReader(const LayerParameter& param);
~DataReader();

inline BlockingQueue<Datum*>& free() const {
return queue_pair_->free_;
}
inline BlockingQueue<Datum*>& full() const {
return queue_pair_->full_;
}

protected:
// Queue pairs are shared between a body and its readers
class QueuePair {
public:
explicit QueuePair(int size);
~QueuePair();

BlockingQueue<Datum*> free_;
BlockingQueue<Datum*> full_;

DISABLE_COPY_AND_ASSIGN(QueuePair);
};

// A single body is created per source
class Body : public InternalThread {
public:
explicit Body(const LayerParameter& param);
virtual ~Body();

protected:
void InternalThreadEntry();
void read_one(db::Cursor* cursor, QueuePair* qp);

const LayerParameter param_;
BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;

friend class DataReader;

DISABLE_COPY_AND_ASSIGN(Body);
};

// A source is uniquely identified by its layer name + path, in case
// the same database is read from two different locations in the net.
static inline string source_key(const LayerParameter& param) {
return param.name() + ":" + param.data_param().source();
}

const shared_ptr<QueuePair> queue_pair_;
shared_ptr<Body> body_;

static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;

DISABLE_COPY_AND_ASSIGN(DataReader);
};

} // namespace caffe

#endif // CAFFE_DATA_READER_HPP_
19 changes: 15 additions & 4 deletions include/caffe/internal_thread.hpp
Expand Up @@ -14,18 +14,22 @@ namespace caffe {
/**
* Virtual class encapsulate boost::thread for use in base class
* The child class will acquire the ability to run a single thread,
* by reimplementing the virutal function InternalThreadEntry.
* by reimplementing the virtual function InternalThreadEntry.
*/
class InternalThread {
public:
InternalThread() : thread_() {}
virtual ~InternalThread();

/** Returns true if the thread was successfully started. **/
bool StartInternalThread();
/**
* Caffe's thread local state will be initialized using the current
* thread values, e.g. device id, solver index etc. The random seed
* is initialized using caffe_rng_rand.
*/
void StartInternalThread();

/** Will not return until the internal thread has exited. */
bool WaitForInternalThreadToExit();
void StopInternalThread();

bool is_started() const;

Expand All @@ -34,6 +38,13 @@ class InternalThread {
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}

/* Should be tested when running loops to exit when requested. */
bool must_stop();

private:
void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count,
bool root_solver);

shared_ptr<boost::thread> thread_;
};

Expand Down

0 comments on commit bb0a90e

Please sign in to comment.