Skip to content

Commit

Permalink
ARROW-7890: [C++] Add Future implementation
Browse files Browse the repository at this point in the history
Closes #6467 from pitrou/ARROW-7890-cpp-promise and squashes the following commits:

c9b8f1b <Antoine Pitrou> Add docstrings
2e96561 <Antoine Pitrou> Add default constructor for Future that constructs an invalid Future
bb273f5 <Antoine Pitrou> ARROW-7890:  Add Future implementation

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou committed Mar 3, 2020
1 parent de786b7 commit c057bff
Show file tree
Hide file tree
Showing 15 changed files with 1,578 additions and 111 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ set(ARROW_SRCS
util/decimal.cc
util/delimiting.cc
util/formatting.cc
util/future.cc
util/int_util.cc
util/io_util.cc
util/iterator.cc
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/flight_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,15 @@ Status RunPerformanceTest(FlightClient* client, bool test_put) {
// }

ARROW_ASSIGN_OR_RAISE(auto pool, ThreadPool::Make(FLAGS_num_threads));
std::vector<std::future<Status>> tasks;
std::vector<Future<Status>> tasks;
for (const auto& endpoint : plan->endpoints()) {
ARROW_ASSIGN_OR_RAISE(auto task, pool->Submit(ConsumeStream, endpoint));
tasks.push_back(std::move(task));
}

// Wait for tasks to finish
for (auto&& task : tasks) {
RETURN_NOT_OK(task.get());
RETURN_NOT_OK(task.status());
}

// Elapsed time in seconds
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@ void AssertSortedEquals(std::vector<T> u, std::vector<T> v) {
ARROW_EXPORT
void SleepFor(double seconds);

template <typename T>
std::vector<T> IteratorToVector(Iterator<T> iterator) {
std::vector<T> out;

auto fn = [&out](T value) -> Status {
out.emplace_back(std::move(value));
return Status::OK();
};

ARROW_EXPECT_OK(iterator.Visit(fn));
return out;
}

// A RAII-style object that switches to a new locale, and switches back
// to the old locale when going out of scope. Doesn't do anything if the
// new locale doesn't exist on the local machine.
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ add_arrow_test(utility-test
uri_test.cc
utf8_util_test.cc)

add_arrow_test(threading-utility-test SOURCES task_group_test thread_pool_test)
add_arrow_test(threading-utility-test
SOURCES
future_test
task_group_test
thread_pool_test)

add_arrow_benchmark(bit_util_benchmark)
add_arrow_benchmark(compression_benchmark)
Expand Down
298 changes: 298 additions & 0 deletions cpp/src/arrow/util/future.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/future.h"

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <numeric>

#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"

namespace arrow {

using internal::checked_cast;

// Shared mutex for all FutureWaiter instances.
// This simplifies lock management compared to a per-waiter mutex.
// The locking order is: global waiter mutex, then per-future mutex.
//
// It is unlikely that many waiter instances are alive at once, so this
// should ideally not limit scalability.
static std::mutex global_waiter_mutex;

class FutureWaiterImpl : public FutureWaiter {
public:
FutureWaiterImpl(Kind kind, std::vector<FutureImpl*> futures)
: signalled_(false),
kind_(kind),
futures_(std::move(futures)),
one_failed_(-1),
fetch_pos_(0) {
finished_futures_.reserve(futures_.size());

// Observe the current state of futures and add waiters to receive future
// state changes, atomically per future.
// We need to lock ourselves, because as soon as SetWaiter() is called,
// a FutureImpl may call MarkFutureFinished() from another thread
// before this constructor finishes.
std::unique_lock<std::mutex> lock(global_waiter_mutex);

for (int i = 0; i < static_cast<int>(futures_.size()); ++i) {
const auto state = futures_[i]->SetWaiter(this, i);
if (IsFutureFinished(state)) {
finished_futures_.push_back(i);
}
if (state != FutureState::SUCCESS) {
one_failed_ = i;
}
}

// Maybe signal the waiter, if the ending condition is already satisfied
if (ShouldSignal()) {
// No need to notify non-existent Wait() calls
signalled_ = true;
}
}

~FutureWaiterImpl() {
for (auto future : futures_) {
future->RemoveWaiter(this);
}
}

// Is the ending condition satisfied?
bool ShouldSignal() {
bool do_signal;
switch (kind_) {
case ANY:
do_signal = (finished_futures_.size() > 0);
break;
case ALL:
do_signal = (finished_futures_.size() == futures_.size());
break;
case ALL_OR_FIRST_FAILED:
do_signal = (finished_futures_.size() == futures_.size()) || one_failed_ >= 0;
break;
case ITERATE:
do_signal = (finished_futures_.size() > static_cast<size_t>(fetch_pos_));
break;
}
return do_signal;
}

void Signal() {
signalled_ = true;
cv_.notify_one();
}

void DoWaitUnlocked(std::unique_lock<std::mutex>* lock) {
cv_.wait(*lock, [this] { return signalled_.load(); });
}

bool DoWait() {
if (signalled_) {
return true;
}
std::unique_lock<std::mutex> lock(global_waiter_mutex);
DoWaitUnlocked(&lock);
return true;
}

template <class Rep, class Period>
bool DoWait(const std::chrono::duration<Rep, Period>& duration) {
if (signalled_) {
return true;
}
std::unique_lock<std::mutex> lock(global_waiter_mutex);
cv_.wait_for(lock, duration, [this] { return signalled_.load(); });
return signalled_.load();
}

void DoMarkFutureFinishedUnlocked(int future_num, FutureState state) {
finished_futures_.push_back(future_num);
if (state != FutureState::SUCCESS) {
one_failed_ = future_num;
}
if (!signalled_ && ShouldSignal()) {
Signal();
}
}

int DoWaitAndFetchOne() {
std::unique_lock<std::mutex> lock(global_waiter_mutex);

DCHECK_EQ(kind_, ITERATE);
DoWaitUnlocked(&lock);
DCHECK_LT(static_cast<size_t>(fetch_pos_), finished_futures_.size());
if (static_cast<size_t>(fetch_pos_) == finished_futures_.size() - 1) {
signalled_ = false;
}
return finished_futures_[fetch_pos_++];
}

std::vector<int> DoMoveFinishedFutures() {
std::unique_lock<std::mutex> lock(global_waiter_mutex);

return std::move(finished_futures_);
}

protected:
std::condition_variable cv_;
std::atomic<bool> signalled_;

Kind kind_;
std::vector<FutureImpl*> futures_;
std::vector<int> finished_futures_;
int one_failed_;
int fetch_pos_;
};

namespace {

FutureWaiterImpl* GetConcreteWaiter(FutureWaiter* waiter) {
return checked_cast<FutureWaiterImpl*>(waiter);
}

} // namespace

FutureWaiter::FutureWaiter() {}

FutureWaiter::~FutureWaiter() {}

std::unique_ptr<FutureWaiter> FutureWaiter::Make(Kind kind,
std::vector<FutureImpl*> futures) {
return std::unique_ptr<FutureWaiter>(new FutureWaiterImpl(kind, std::move(futures)));
}

void FutureWaiter::MarkFutureFinishedUnlocked(int future_num, FutureState state) {
// Called by FutureImpl on state changes
GetConcreteWaiter(this)->DoMarkFutureFinishedUnlocked(future_num, state);
}

bool FutureWaiter::Wait(double seconds) {
if (seconds == kInfinity) {
return GetConcreteWaiter(this)->DoWait();
} else {
return GetConcreteWaiter(this)->DoWait(std::chrono::duration<double>(seconds));
}
}

int FutureWaiter::WaitAndFetchOne() {
return GetConcreteWaiter(this)->DoWaitAndFetchOne();
}

std::vector<int> FutureWaiter::MoveFinishedFutures() {
return GetConcreteWaiter(this)->DoMoveFinishedFutures();
}

class ConcreteFutureImpl : public FutureImpl {
public:
FutureState DoSetWaiter(FutureWaiter* w, int future_num) {
std::unique_lock<std::mutex> lock(mutex_);

// Atomically load state at the time of adding the waiter, to avoid
// missed or duplicate events in the caller
ARROW_CHECK_EQ(waiter_, nullptr)
<< "Only one Waiter allowed per Future at any given time";
waiter_ = w;
waiter_arg_ = future_num;
return state_.load();
}

void DoRemoveWaiter(FutureWaiter* w) {
std::unique_lock<std::mutex> lock(mutex_);

ARROW_CHECK_EQ(waiter_, w);
waiter_ = nullptr;
}

void DoMarkFinished() { DoMarkFinishedOrFailed(FutureState::SUCCESS); }

void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }

void DoMarkFinishedOrFailed(FutureState state) {
{
// Lock the hypothetical waiter first, and the future after.
// This matches the locking order done in FutureWaiter constructor.
std::unique_lock<std::mutex> waiter_lock(global_waiter_mutex);
std::unique_lock<std::mutex> lock(mutex_);

DCHECK(!IsFutureFinished(state_)) << "Future already marked finished";
state_ = state;
if (waiter_ != nullptr) {
waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state);
}
}
cv_.notify_all();
}

void DoWait() {
std::unique_lock<std::mutex> lock(mutex_);

cv_.wait(lock, [this] { return IsFutureFinished(state_); });
}

bool DoWait(double seconds) {
std::unique_lock<std::mutex> lock(mutex_);

cv_.wait_for(lock, std::chrono::duration<double>(seconds),
[this] { return IsFutureFinished(state_); });
return IsFutureFinished(state_);
}

std::mutex mutex_;
std::condition_variable cv_;
FutureWaiter* waiter_ = nullptr;
int waiter_arg_ = -1;
};

namespace {

ConcreteFutureImpl* GetConcreteFuture(FutureImpl* future) {
return checked_cast<ConcreteFutureImpl*>(future);
}

} // namespace

std::unique_ptr<FutureImpl> FutureImpl::Make() {
return std::unique_ptr<FutureImpl>(new ConcreteFutureImpl());
}

FutureImpl::FutureImpl() : state_(FutureState::PENDING) {}

FutureState FutureImpl::SetWaiter(FutureWaiter* w, int future_num) {
return GetConcreteFuture(this)->DoSetWaiter(w, future_num);
}

void FutureImpl::RemoveWaiter(FutureWaiter* w) {
GetConcreteFuture(this)->DoRemoveWaiter(w);
}

void FutureImpl::Wait() { GetConcreteFuture(this)->DoWait(); }

bool FutureImpl::Wait(double seconds) { return GetConcreteFuture(this)->DoWait(seconds); }

void FutureImpl::MarkFinished() { GetConcreteFuture(this)->DoMarkFinished(); }

void FutureImpl::MarkFailed() { GetConcreteFuture(this)->DoMarkFailed(); }

} // namespace arrow
Loading

0 comments on commit c057bff

Please sign in to comment.