Skip to content

Commit

Permalink
[ABSL] Purge common/mutex.h. (#1369)
Browse files Browse the repository at this point in the history
  • Loading branch information
pifon2a committed Aug 7, 2018
1 parent 54f7f4f commit 8d5bf2a
Show file tree
Hide file tree
Showing 23 changed files with 439 additions and 449 deletions.
14 changes: 7 additions & 7 deletions cartographer/cloud/internal/map_builder_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void MapBuilderServer::OnLocalSlamResult(
->EnqueueSensorData(std::move(sensor_data));
}

common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
auto copy_of_insertion_result =
insertion_result
Expand All @@ -224,7 +224,7 @@ void MapBuilderServer::OnLocalSlamResult(
void MapBuilderServer::OnGlobalSlamOptimizations(
const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
for (auto& entry : global_slam_subscriptions_) {
if (!entry.second(last_optimized_submap_ids, last_optimized_node_ids)) {
LOG(INFO) << "Removing subscription with index: " << entry.first;
Expand All @@ -237,7 +237,7 @@ MapBuilderContextInterface::LocalSlamSubscriptionId
MapBuilderServer::SubscribeLocalSlamResults(
int trajectory_id,
MapBuilderContextInterface::LocalSlamSubscriptionCallback callback) {
common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
local_slam_subscriptions_[trajectory_id].emplace(current_subscription_index_,
callback);
return MapBuilderContextInterface::LocalSlamSubscriptionId{
Expand All @@ -247,27 +247,27 @@ MapBuilderServer::SubscribeLocalSlamResults(
void MapBuilderServer::UnsubscribeLocalSlamResults(
const MapBuilderContextInterface::LocalSlamSubscriptionId&
subscription_id) {
common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
CHECK_EQ(local_slam_subscriptions_[subscription_id.trajectory_id].erase(
subscription_id.subscription_index),
1u);
}

int MapBuilderServer::SubscribeGlobalSlamOptimizations(
MapBuilderContextInterface::GlobalSlamOptimizationCallback callback) {
common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
global_slam_subscriptions_.emplace(current_subscription_index_, callback);
return current_subscription_index_++;
}

void MapBuilderServer::UnsubscribeGlobalSlamOptimizations(
int subscription_index) {
common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
CHECK_EQ(global_slam_subscriptions_.erase(subscription_index), 1u);
}

void MapBuilderServer::NotifyFinishTrajectory(int trajectory_id) {
common::MutexLocker locker(&subscriptions_lock_);
absl::MutexLock locker(&subscriptions_lock_);
for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
entry.second;
Expand Down
2 changes: 1 addition & 1 deletion cartographer/cloud/internal/map_builder_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class MapBuilderServer : public MapBuilderServerInterface {
std::unique_ptr<mapping::MapBuilderInterface> map_builder_;
common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>
incoming_data_queue_;
common::Mutex subscriptions_lock_;
absl::Mutex subscriptions_lock_;
int current_subscription_index_ = 0;
std::map<int /* trajectory ID */, LocalSlamResultHandlerSubscriptions>
local_slam_subscriptions_ GUARDED_BY(subscriptions_lock_);
Expand Down
65 changes: 41 additions & 24 deletions cartographer/common/blocking_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <deque>
#include <memory>

#include "cartographer/common/mutex.h"
#include "absl/synchronization/mutex.h"
#include "cartographer/common/port.h"
#include "cartographer/common/time.h"
#include "glog/logging.h"
Expand All @@ -47,17 +47,22 @@ class BlockingQueue {

// Pushes a value onto the queue. Blocks if the queue is full.
void Push(T t) {
MutexLocker lock(&mutex_);
lock.Await([this]() REQUIRES(mutex_) { return QueueNotFullCondition(); });
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return QueueNotFullCondition();
};
absl::MutexLock lock(&mutex_);
mutex_.Await(absl::Condition(&predicate));
deque_.push_back(std::move(t));
}

// Like push, but returns false if 'timeout' is reached.
bool PushWithTimeout(T t, const common::Duration timeout) {
MutexLocker lock(&mutex_);
if (!lock.AwaitWithTimeout(
[this]() REQUIRES(mutex_) { return QueueNotFullCondition(); },
timeout)) {
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return QueueNotFullCondition();
};
absl::MutexLock lock(&mutex_);
if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(timeout))) {
return false;
}
deque_.push_back(std::move(t));
Expand All @@ -66,8 +71,11 @@ class BlockingQueue {

// Pops the next value from the queue. Blocks until a value is available.
T Pop() {
MutexLocker lock(&mutex_);
lock.Await([this]() REQUIRES(mutex_) { return !QueueEmptyCondition(); });
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !QueueEmptyCondition();
};
absl::MutexLock lock(&mutex_);
mutex_.Await(absl::Condition(&predicate));

T t = std::move(deque_.front());
deque_.pop_front();
Expand All @@ -76,10 +84,12 @@ class BlockingQueue {

// Like Pop, but can timeout. Returns nullptr in this case.
T PopWithTimeout(const common::Duration timeout) {
MutexLocker lock(&mutex_);
if (!lock.AwaitWithTimeout(
[this]() REQUIRES(mutex_) { return !QueueEmptyCondition(); },
timeout)) {
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !QueueEmptyCondition();
};
absl::MutexLock lock(&mutex_);
if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(timeout))) {
return nullptr;
}
T t = std::move(deque_.front());
Expand All @@ -90,10 +100,12 @@ class BlockingQueue {
// Like Peek, but can timeout. Returns nullptr in this case.
template <typename R>
R* PeekWithTimeout(const common::Duration timeout) {
MutexLocker lock(&mutex_);
if (!lock.AwaitWithTimeout(
[this]() REQUIRES(mutex_) { return !QueueEmptyCondition(); },
timeout)) {
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !QueueEmptyCondition();
};
absl::MutexLock lock(&mutex_);
if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(timeout))) {
return nullptr;
}
return deque_.front().get();
Expand All @@ -104,7 +116,7 @@ class BlockingQueue {
// a pointer to the given type R.
template <typename R>
const R* Peek() {
MutexLocker lock(&mutex_);
absl::MutexLock lock(&mutex_);
if (deque_.empty()) {
return nullptr;
}
Expand All @@ -113,26 +125,31 @@ class BlockingQueue {

// Returns the number of items currently in the queue.
size_t Size() {
MutexLocker lock(&mutex_);
absl::MutexLock lock(&mutex_);
return deque_.size();
}

// Blocks until the queue is empty.
void WaitUntilEmpty() {
MutexLocker lock(&mutex_);
lock.Await([this]() REQUIRES(mutex_) { return QueueEmptyCondition(); });
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return QueueEmptyCondition();
};
absl::MutexLock lock(&mutex_);
mutex_.Await(absl::Condition(&predicate));
}

private:
// Returns true iff the queue is empty.
bool QueueEmptyCondition() REQUIRES(mutex_) { return deque_.empty(); }
bool QueueEmptyCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return deque_.empty();
}

// Returns true iff the queue is not full.
bool QueueNotFullCondition() REQUIRES(mutex_) {
bool QueueNotFullCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;
}

Mutex mutex_;
absl::Mutex mutex_;
const size_t queue_size_ GUARDED_BY(mutex_);
std::deque<T> deque_ GUARDED_BY(mutex_);
};
Expand Down
25 changes: 14 additions & 11 deletions cartographer/common/internal/testing/thread_pool_for_testing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ThreadPoolForTesting::ThreadPoolForTesting()

ThreadPoolForTesting::~ThreadPoolForTesting() {
{
MutexLocker locker(&mutex_);
absl::MutexLock locker(&mutex_);
CHECK(running_);
running_ = false;
CHECK_EQ(task_queue_.size(), 0);
Expand All @@ -45,7 +45,7 @@ ThreadPoolForTesting::~ThreadPoolForTesting() {
}

void ThreadPoolForTesting::NotifyDependenciesCompleted(Task* task) {
MutexLocker locker(&mutex_);
absl::MutexLock locker(&mutex_);
CHECK(running_);
auto it = tasks_not_ready_.find(task);
CHECK(it != tasks_not_ready_.end());
Expand All @@ -56,7 +56,7 @@ void ThreadPoolForTesting::NotifyDependenciesCompleted(Task* task) {
std::weak_ptr<Task> ThreadPoolForTesting::Schedule(std::unique_ptr<Task> task) {
std::shared_ptr<Task> shared_task;
{
MutexLocker locker(&mutex_);
absl::MutexLock locker(&mutex_);
idle_ = false;
CHECK(running_);
auto insert_result =
Expand All @@ -69,26 +69,29 @@ std::weak_ptr<Task> ThreadPoolForTesting::Schedule(std::unique_ptr<Task> task) {
}

void ThreadPoolForTesting::WaitUntilIdle() {
const auto predicate = [this]()
EXCLUSIVE_LOCKS_REQUIRED(mutex_) { return idle_; };
for (;;) {
{
common::MutexLocker locker(&mutex_);
if (locker.AwaitWithTimeout([this]() REQUIRES(mutex_) { return idle_; },
common::FromSeconds(0.1))) {
absl::MutexLock locker(&mutex_);
if (mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(0.1)))) {
return;
}
}
}
}

void ThreadPoolForTesting::DoWork() {
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !task_queue_.empty() || !running_;
};
for (;;) {
std::shared_ptr<Task> task;
{
MutexLocker locker(&mutex_);
locker.AwaitWithTimeout(
[this]()
REQUIRES(mutex_) { return !task_queue_.empty() || !running_; },
common::FromSeconds(0.1));
absl::MutexLock locker(&mutex_);
mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(0.1)));
if (!task_queue_.empty()) {
task = task_queue_.front();
task_queue_.pop_front();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <map>
#include <thread>

#include "cartographer/common/mutex.h"
#include "absl/synchronization/mutex.h"
#include "cartographer/common/thread_pool.h"

namespace cartographer {
Expand All @@ -35,7 +35,7 @@ class ThreadPoolForTesting : public ThreadPoolInterface {
~ThreadPoolForTesting();

std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task)
EXCLUDES(mutex_) override;
LOCKS_EXCLUDED(mutex_) override;

void WaitUntilIdle();

Expand All @@ -44,9 +44,9 @@ class ThreadPoolForTesting : public ThreadPoolInterface {

void DoWork();

void NotifyDependenciesCompleted(Task* task) EXCLUDES(mutex_) override;
void NotifyDependenciesCompleted(Task* task) LOCKS_EXCLUDED(mutex_) override;

Mutex mutex_;
absl::Mutex mutex_;
bool running_ GUARDED_BY(mutex_) = true;
bool idle_ GUARDED_BY(mutex_) = true;
std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY(mutex_);
Expand Down
67 changes: 0 additions & 67 deletions cartographer/common/mutex.h

This file was deleted.

0 comments on commit 8d5bf2a

Please sign in to comment.