Skip to content

Commit

Permalink
Added BlockingQueue for inter-thread communication.
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed Apr 28, 2015
1 parent 28e8f9e commit a0a36f8
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
47 changes: 47 additions & 0 deletions include/caffe/util/blocking_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef CAFFE_UTIL_BLOCKING_QUEUE_HPP_
#define CAFFE_UTIL_BLOCKING_QUEUE_HPP_

#include <queue>
#include <string>

#include "caffe/common.hpp"

namespace caffe {

template<typename T>
class BlockingQueue {
public:
explicit BlockingQueue();

void push(const T& t);

bool try_pop(T* t);

// This logs a message if the threads needs to be blocked
// useful for detecting e.g. when data feeding is too slow
T pop(const string& log_on_wait = "");

bool try_peek(T* t);

// Return element without removing it
T peek();

size_t size() const;

protected:
/**
Move synchronization fields out instead of including boost/thread.hpp
to avoid a boost/NVCC issues (#1009, #1010) on OSX. Also fails on
Linux CUDA 7.0.18.
*/
class sync;

std::queue<T> queue_;
shared_ptr<sync> sync_;

DISABLE_COPY_AND_ASSIGN(BlockingQueue);
};

} // namespace caffe

#endif
86 changes: 86 additions & 0 deletions src/caffe/util/blocking_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include <boost/thread.hpp>
#include <string>

#include "caffe/util/blocking_queue.hpp"

namespace caffe {

template<typename T>
class BlockingQueue<T>::sync {
public:
mutable boost::mutex mutex_;
boost::condition_variable condition_;
};

template<typename T>
BlockingQueue<T>::BlockingQueue()
: sync_(new sync()) {
}

template<typename T>
void BlockingQueue<T>::push(const T& t) {
boost::mutex::scoped_lock lock(sync_->mutex_);
queue_.push(t);
lock.unlock();
sync_->condition_.notify_one();
}

template<typename T>
bool BlockingQueue<T>::try_pop(T* t) {
boost::mutex::scoped_lock lock(sync_->mutex_);

if (queue_.empty()) {
return false;
}

*t = queue_.front();
queue_.pop();
return true;
}

template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
boost::mutex::scoped_lock lock(sync_->mutex_);

while (queue_.empty()) {
if (!log_on_wait.empty()) {
LOG(INFO)<< log_on_wait;
}
sync_->condition_.wait(lock);
}

T t = queue_.front();
queue_.pop();
return t;
}

template<typename T>
bool BlockingQueue<T>::try_peek(T* t) {
boost::mutex::scoped_lock lock(sync_->mutex_);

if (queue_.empty()) {
return false;
}

*t = queue_.front();
return true;
}

template<typename T>
T BlockingQueue<T>::peek() {
boost::mutex::scoped_lock lock(sync_->mutex_);

while (queue_.empty()) {
sync_->condition_.wait(lock);
}

return queue_.front();
}

template<typename T>
size_t BlockingQueue<T>::size() const {
boost::mutex::scoped_lock lock(sync_->mutex_);
return queue_.size();
}

} // namespace caffe

0 comments on commit a0a36f8

Please sign in to comment.