Multi-GPU Data Parallelism (with Parallel Data Layers) #2903

Merged
merged 11 commits into from Aug 13, 2015
@@ -50,6 +50,13 @@ For a full example of fine-tuning, see examples/finetuning_on_flickr_style, but
# query the first device
caffe device_query -gpu 0
+**Parallelism**: the `-gpu` flag to the `caffe` tool can take a comma separated list of IDs to run on multiple GPUs. A solver and net will be instantiated for each GPU so the batch size is effectively multiplied by the number of GPUs. To reproduce single GPU training, reduce the batch size in the network definition accordingly.
+
+ # train on GPUs 0 & 1 (doubling the batch size)
+ caffe train -solver examples/mnist/lenet_solver.prototxt -gpu 0,1
+ # train on all GPUs (multiplying batch size by number of devices)
+ caffe train -solver examples/mnist/lenet_solver.prototxt -gpu all
+
## Python
The Python interface -- pycaffe -- is the `caffe` module and its scripts in caffe/python. `import caffe` to load models, do forward and backward, handle IO, visualize networks, and even instrument model solving. All model data, derivatives, and parameters are exposed for reading and writing.
View
@@ -10,6 +10,7 @@
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
#include "caffe/net.hpp"
+#include "caffe/parallel.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/solver.hpp"
#include "caffe/util/benchmark.hpp"
View
@@ -98,12 +98,12 @@ void GlobalInit(int* pargc, char*** pargv);
class Caffe {
public:
~Caffe();
- inline static Caffe& Get() {
- if (!singleton_.get()) {
- singleton_.reset(new Caffe());
- }
- return *singleton_;
- }
+
+ // Thread local context for Caffe. Moved to common.cpp instead of
+ // including boost/thread.hpp to avoid a boost/NVCC issues (#1009, #1010)
+ // on OSX. Also fails on Linux with CUDA 7.0.18.
+ static Caffe& Get();
+
enum Brew { CPU, GPU };
// This random number generator facade hides boost and CUDA rng
@@ -149,6 +149,11 @@ class Caffe {
static void SetDevice(const int device_id);
// Prints the current GPU status.
static void DeviceQuery();
+ // Parallel training info
+ inline static int solver_count() { return Get().solver_count_; }
+ inline static void set_solver_count(int val) { Get().solver_count_ = val; }
+ inline static bool root_solver() { return Get().root_solver_; }
+ inline static void set_root_solver(bool val) { Get().root_solver_ = val; }
protected:
#ifndef CPU_ONLY
@@ -158,7 +163,8 @@ class Caffe {
shared_ptr<RNG> random_generator_;
Brew mode_;
- static shared_ptr<Caffe> singleton_;
+ int solver_count_;
+ bool root_solver_;
private:
// The private constructor to avoid duplicate instantiation.
@@ -5,16 +5,17 @@
#include <utility>
#include <vector>
-#include "boost/scoped_ptr.hpp"
#include "hdf5.h"
#include "caffe/blob.hpp"
#include "caffe/common.hpp"
+#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
+#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"
namespace caffe {
@@ -33,6 +34,8 @@ class BaseDataLayer : public Layer<Dtype> {
// This method may not be overridden except by the BasePrefetchingDataLayer.
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
// Data layers have no bottoms, so reshaping is trivial.
@@ -51,11 +54,16 @@ class BaseDataLayer : public Layer<Dtype> {
};
template <typename Dtype>
+class Batch {
+ public:
+ Blob<Dtype> data_, label_;
+};
+
+template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
- explicit BasePrefetchingDataLayer(const LayerParameter& param)
- : BaseDataLayer<Dtype>(param) {}
+ explicit BasePrefetchingDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden.
@@ -67,36 +75,38 @@ class BasePrefetchingDataLayer :
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
- virtual void CreatePrefetchThread();
- virtual void JoinPrefetchThread();
- // The thread's function
- virtual void InternalThreadEntry() {}
+ // Prefetches batches (asynchronously if to GPU memory)
+ static const int PREFETCH_COUNT = 3;
protected:
- Blob<Dtype> prefetch_data_;
- Blob<Dtype> prefetch_label_;
+ virtual void InternalThreadEntry();
+ virtual void load_batch(Batch<Dtype>* batch) = 0;
+
+ Batch<Dtype> prefetch_[PREFETCH_COUNT];
+ BlockingQueue<Batch<Dtype>*> prefetch_free_;
+ BlockingQueue<Batch<Dtype>*> prefetch_full_;
+
Blob<Dtype> transformed_data_;
};
template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
- explicit DataLayer(const LayerParameter& param)
- : BasePrefetchingDataLayer<Dtype>(param) {}
+ explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
-
+ // DataLayer uses DataReader instead for sharing for parallelism
+ virtual inline bool ShareInParallel() const { return false; }
virtual inline const char* type() const { return "Data"; }
virtual inline int ExactNumBottomBlobs() const { return 0; }
virtual inline int MinTopBlobs() const { return 1; }
virtual inline int MaxTopBlobs() const { return 2; }
protected:
- virtual void InternalThreadEntry();
+ virtual void load_batch(Batch<Dtype>* batch);
- shared_ptr<db::DB> db_;
- shared_ptr<db::Cursor> cursor_;
+ DataReader reader_;
};
/**
@@ -111,6 +121,8 @@ class DummyDataLayer : public Layer<Dtype> {
: Layer<Dtype>(param) {}
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
@@ -144,6 +156,8 @@ class HDF5DataLayer : public Layer<Dtype> {
virtual ~HDF5DataLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
@@ -185,6 +199,8 @@ class HDF5OutputLayer : public Layer<Dtype> {
virtual ~HDF5OutputLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
@@ -235,7 +251,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
shared_ptr<Caffe::RNG> prefetch_rng_;
virtual void ShuffleImages();
- virtual void InternalThreadEntry();
+ virtual void load_batch(Batch<Dtype>* batch);
vector<std::pair<std::string, int> > lines_;
int lines_id_;
@@ -307,7 +323,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
virtual unsigned int PrefetchRand();
- virtual void InternalThreadEntry();
+ virtual void load_batch(Batch<Dtype>* batch);
shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, vector<int> > > image_database_;
@@ -0,0 +1,82 @@
+#ifndef CAFFE_DATA_READER_HPP_
+#define CAFFE_DATA_READER_HPP_
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "caffe/common.hpp"
+#include "caffe/internal_thread.hpp"
+#include "caffe/util/blocking_queue.hpp"
+#include "caffe/util/db.hpp"
+
+namespace caffe {
+
+/**
+ * @brief Reads data from a source to queues available to data layers.
+ * A single reading thread is created per source, even if multiple solvers
+ * are running in parallel, e.g. for multi-GPU training. This makes sure
+ * databases are read sequentially, and that each solver accesses a different
+ * subset of the database. Data is distributed to solvers in a round-robin
+ * way to keep parallel training deterministic.
+ */
+class DataReader {
+ public:
+ explicit DataReader(const LayerParameter& param);
+ ~DataReader();
+
+ inline BlockingQueue<Datum*>& free() const {
+ return queue_pair_->free_;
+ }
+ inline BlockingQueue<Datum*>& full() const {
+ return queue_pair_->full_;
+ }
+
+ protected:
+ // Queue pairs are shared between a body and its readers
+ class QueuePair {
+ public:
+ explicit QueuePair(int size);
+ ~QueuePair();
+
+ BlockingQueue<Datum*> free_;
+ BlockingQueue<Datum*> full_;
+
+ DISABLE_COPY_AND_ASSIGN(QueuePair);
+ };
+
+ // A single body is created per source
+ class Body : public InternalThread {
+ public:
+ explicit Body(const LayerParameter& param);
+ virtual ~Body();
+
+ protected:
+ void InternalThreadEntry();
+ void read_one(db::Cursor* cursor, QueuePair* qp);
+
+ const LayerParameter param_;
+ BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
+
+ friend class DataReader;
+
+ DISABLE_COPY_AND_ASSIGN(Body);
+ };
+
+ // A source is uniquely identified by its layer name + path, in case
+ // the same database is read from two different locations in the net.
+ static inline string source_key(const LayerParameter& param) {
+ return param.name() + ":" + param.data_param().source();
+ }
+
+ const shared_ptr<QueuePair> queue_pair_;
+ shared_ptr<Body> body_;
+
+ static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
+
+DISABLE_COPY_AND_ASSIGN(DataReader);
+};
+
+} // namespace caffe
+
+#endif // CAFFE_DATA_READER_HPP_
@@ -14,18 +14,22 @@ namespace caffe {
/**
* Virtual class encapsulate boost::thread for use in base class
* The child class will acquire the ability to run a single thread,
- * by reimplementing the virutal function InternalThreadEntry.
+ * by reimplementing the virtual function InternalThreadEntry.
*/
class InternalThread {
public:
InternalThread() : thread_() {}
virtual ~InternalThread();
- /** Returns true if the thread was successfully started. **/
- bool StartInternalThread();
+ /**
+ * Caffe's thread local state will be initialized using the current
+ * thread values, e.g. device id, solver index etc. The random seed
+ * is initialized using caffe_rng_rand.
+ */
+ void StartInternalThread();
/** Will not return until the internal thread has exited. */
- bool WaitForInternalThreadToExit();
+ void StopInternalThread();
bool is_started() const;
@@ -34,6 +38,13 @@ class InternalThread {
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}
+ /* Should be tested when running loops to exit when requested. */
+ bool must_stop();
+
+ private:
+ void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count,
+ bool root_solver);
+
shared_ptr<boost::thread> thread_;
};
Oops, something went wrong.