Skip to content
Permalink
Browse files

[C++] Future: Use std::future internally

- Use std::future internally in joynr Future class.
- Also remove the specialization for unique_ptr.  If we need it later,
  we need a generalized form for non-copyable result types.  It has also
  a different semantic: the get() method, when called more than once,
  resulted in a different, invalid (moved-out) response.
- Earlier it was also possible to call onSuccess/onError multiple times,
  resulting in overwriting the previously set results.  Now a
  std::future_error will be thrown, caught and logged.
- Note: This commit changes the ABI.
  • Loading branch information
Mátyás Forstner thomasreuterdebertrandtcom
Mátyás Forstner authored and thomasreuterdebertrandtcom committed Nov 22, 2019
1 parent 590ae6d commit 1e0b35df813e7349d28b4d570432442155ba9315
@@ -20,13 +20,16 @@
#define FUTURE_H

#include <cstdint>
#include <exception>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <tuple>
#include <utility>

#include "joynr/Logger.h"
#include "joynr/Semaphore.h"
#include "joynr/PrivateCopyAssign.h"
#include "joynr/StatusCode.h"
#include "joynr/Util.h"
#include "joynr/exceptions/JoynrException.h"
@@ -38,6 +41,13 @@ namespace joynr
class FutureBase
{
public:
virtual ~FutureBase()
{
JOYNR_LOG_TRACE(logger(), "destructor has been invoked");
// Ensure onSuccess/onError can finish before deleting this object.
const std::lock_guard<std::mutex> lock{_statusMutex};
}

/**
* @brief This is a blocking call which waits until the request finishes/an error
* occurs/or times out.
@@ -49,11 +59,14 @@ class FutureBase
*/
void wait(std::int64_t timeOut)
{
if (_resultReceived.waitFor(std::chrono::milliseconds(timeOut))) {
_resultReceived.notify();
} else {
_status = StatusCodeEnum::WAIT_TIMED_OUT;
throw exceptions::JoynrTimeOutException("Request did not finish in time");
JOYNR_LOG_TRACE(logger(), "wait for future {}ms...", timeOut);
if (waitForFuture(timeOut) != std::future_status::ready) {
const std::lock_guard<std::mutex> lock{_statusMutex};
// Now we have the lock, recheck the status. Avoid owning the lock too long.
if (waitForFuture(0) != std::future_status::ready) {
_status = StatusCodeEnum::WAIT_TIMED_OUT;
throw exceptions::JoynrTimeOutException("Request did not finish in time");
}
}
}

@@ -63,9 +76,8 @@ class FutureBase
*/
void wait()
{
JOYNR_LOG_TRACE(logger(), "resultReceived.getStatus():{}", _resultReceived.getStatus());
_resultReceived.wait();
_resultReceived.notify();
JOYNR_LOG_TRACE(logger(), "wait for future...");
waitForFuture();
}

/**
@@ -74,6 +86,8 @@ class FutureBase
*/
StatusCodeEnum getStatus() const
{
JOYNR_LOG_TRACE(logger(), "getStatus has been invoked");
const std::lock_guard<std::mutex> lock{_statusMutex};
return _status;
}

@@ -83,7 +97,7 @@ class FutureBase
*/
bool isOk() const
{
return _status == StatusCodeEnum::SUCCESS;
return getStatus() == StatusCodeEnum::SUCCESS;
}

/**
@@ -93,29 +107,45 @@ class FutureBase
void onError(std::shared_ptr<exceptions::JoynrException> error)
{
JOYNR_LOG_TRACE(logger(), "onError has been invoked");
this->_error = std::move(error);
_status = StatusCodeEnum::ERROR;
_resultReceived.notify();
const std::lock_guard<std::mutex> lock{_statusMutex};
try {
exceptions::JoynrExceptionUtil::throwJoynrException(*error);
JOYNR_LOG_WARN(logger(), "onError: no exception was thrown");
} catch (const exceptions::JoynrException&) {
try {
storeException(std::current_exception());
_status = StatusCodeEnum::ERROR;
} catch (const std::future_error& e) {
JOYNR_LOG_ERROR(logger(),
"While calling onError: future_error caught: {}"
" [_status = {}]",
e.what(),
_status);
}
}
}

protected:
FutureBase() : _error(nullptr), _status(StatusCodeEnum::IN_PROGRESS), _resultReceived(0)
FutureBase() : _status(StatusCodeEnum::IN_PROGRESS)
{
}

void checkOk() const
{
if (!isOk()) {
exceptions::JoynrExceptionUtil::throwJoynrException(*_error);
}
}
virtual void storeException(std::exception_ptr eptr) = 0;

virtual std::future_status waitForFuture(std::int64_t timeOut) = 0;

virtual void waitForFuture() = 0;

std::shared_ptr<exceptions::JoynrException> _error;
StatusCodeEnum _status;
Semaphore _resultReceived;
ADD_LOGGER(FutureBase)
mutable std::mutex _statusMutex;
StatusCodeEnum _status;

private:
DISALLOW_COPY_AND_ASSIGN(FutureBase);
};

// -------------------------------------------------------------------------------------------------

template <class... Ts>
/**
* @brief Class for monitoring the status of a request by applications.
@@ -129,29 +159,12 @@ template <class... Ts>
*/
class Future : public FutureBase
{

public:
/**
* @brief Constructor
*/
Future() = default;

template <typename T>
void copyResultsImpl(T& dest, const T& value) const
{
dest = value;
}

template <std::size_t... Indices>
void copyResults(const std::tuple<Ts&...>& destination, std::index_sequence<Indices...>) const
{
auto l = {
0,
(void(copyResultsImpl(std::get<Indices>(destination), std::get<Indices>(_results))),
0)...};
std::ignore = l;
}

/**
* @brief This is a blocking call which waits until the request finishes/an error
* occurs/or times out. If the request finishes successfully, it retrieves the return value for
@@ -162,10 +175,8 @@ class Future : public FutureBase
*/
void get(Ts&... values)
{
this->wait();
this->checkOk();

copyResults(std::tie(values...), std::index_sequence_for<Ts...>{});
JOYNR_LOG_TRACE(logger(), "get has been invoked");
std::tie(values...) = _resultFuture.get();
}

/**
@@ -179,10 +190,8 @@ class Future : public FutureBase
*/
void get(std::int64_t timeOut, Ts&... values)
{
this->wait(timeOut);
this->checkOk();

copyResults(std::tie(values...), std::index_sequence_for<Ts...>{});
wait(timeOut);
get(values...);
}

/**
@@ -191,24 +200,49 @@ class Future : public FutureBase
*/
void onSuccess(Ts... results)
{
JOYNR_LOG_TRACE(this->logger(), "onSuccess has been invoked");
this->_status = StatusCodeEnum::SUCCESS;
// transform variadic templates into a std::tuple
this->_results = std::make_tuple(std::move(results)...);
this->_resultReceived.notify();
JOYNR_LOG_TRACE(logger(), "onSuccess has been invoked");
const std::lock_guard<std::mutex> lock{_statusMutex};
try {
_resultPromise.set_value(std::make_tuple(std::move(results)...));
_status = StatusCodeEnum::SUCCESS;
} catch (const std::future_error& e) {
JOYNR_LOG_ERROR(logger(),
"While calling onError: future_error caught: {} [_status = {}]",
e.what(),
_status);
}
}

private:
std::tuple<Ts...> _results;
void storeException(std::exception_ptr eptr) override
{
_resultPromise.set_exception(std::move(eptr));
}

std::future_status waitForFuture(std::int64_t timeOut) override
{
return _resultFuture.wait_for(std::chrono::milliseconds(timeOut));
}

void waitForFuture() override
{
_resultFuture.wait();
}

using ResultTuple = std::tuple<Ts...>;

std::promise<ResultTuple> _resultPromise;
std::shared_future<ResultTuple> _resultFuture{_resultPromise.get_future()};
};

// -------------------------------------------------------------------------------------------------

template <>
/**
* @brief Class specialization of the void Future class.
*/
class Future<void> : public FutureBase
{

public:
Future() = default;

@@ -220,8 +254,8 @@ class Future<void> : public FutureBase
*/
void get()
{
this->wait();
this->checkOk();
JOYNR_LOG_TRACE(logger(), "get has been invoked");
_resultFuture.get();
}

/**
@@ -234,50 +268,48 @@ class Future<void> : public FutureBase
*/
void get(std::int64_t timeOut)
{
this->wait(timeOut);
this->checkOk();
wait(timeOut);
get();
}

/**
* @brief Callback which indicates the operation has finished and is successful.
*/
void onSuccess()
{
this->_status = StatusCodeEnum::SUCCESS;
this->_resultReceived.notify();
JOYNR_LOG_TRACE(logger(), "onSuccess has been invoked");
const std::lock_guard<std::mutex> lock{_statusMutex};
try {
_resultPromise.set_value();
_status = StatusCodeEnum::SUCCESS;
} catch (const std::future_error& e) {
JOYNR_LOG_ERROR(logger(),
"While calling onError: future_error caught: {} [_status = {}]",
e.what(),
_status);
}
}
};

template <typename T>
class Future<std::unique_ptr<T>> : public FutureBase
{
public:
void get(std::unique_ptr<T>& value)
private:
void storeException(std::exception_ptr eptr) override
{
this->wait();
this->checkOk();

value = std::move(_result);
_resultPromise.set_exception(std::move(eptr));
}

void get(std::int64_t timeOut, std::unique_ptr<T>& value)
std::future_status waitForFuture(std::int64_t timeOut) override
{
this->wait(timeOut);
this->checkOk();

value = std::move(_result);
return _resultFuture.wait_for(std::chrono::milliseconds(timeOut));
}

void onSuccess(std::unique_ptr<T> value)
void waitForFuture() override
{
_result = std::move(value);
this->_status = StatusCodeEnum::SUCCESS;
this->_resultReceived.notify();
_resultFuture.wait();
}

private:
std::unique_ptr<T> _result;
std::promise<void> _resultPromise;
std::shared_future<void> _resultFuture{_resultPromise.get_future()};
};

} // namespace joynr

#endif // FUTURE_H
@@ -660,17 +660,14 @@ void LocalCapabilitiesDirectory::lookup(const std::string& participantId,
}
};

auto flag = std::make_shared<std::once_flag>();
auto onRuntimeError = [callback, participantId, flag](
const exceptions::JoynrRuntimeException& exception) {
auto onRuntimeError =
[callback, participantId](const exceptions::JoynrRuntimeException& exception) {
JOYNR_LOG_DEBUG(logger(),
"Global lookup for participantId {} failed with exception: {} ({})",
participantId,
exception.getMessage(),
exception.TYPE_NAME());
std::call_once(*flag, [callback]() {
callback->onError(types::DiscoveryError::INTERNAL_ERROR);
});
callback->onError(types::DiscoveryError::INTERNAL_ERROR);
};

_globalCapabilitiesDirectoryClient->lookup(participantId,
@@ -1740,15 +1737,15 @@ LocalCapabilitiesCallback::LocalCapabilitiesCallback(

void LocalCapabilitiesCallback::onError(const types::DiscoveryError::Enum& error)
{
_onErrorCallback(error);
std::call_once(_onceFlag, _onErrorCallback, error);
//"Unable to collect capabilities from global capabilities directory. Error: " +
// error.getMessage()));
}

void LocalCapabilitiesCallback::capabilitiesReceived(
const std::vector<types::DiscoveryEntryWithMetaInfo>& capabilities)
{
_onSuccess(capabilities);
std::call_once(_onceFlag, _onSuccess, capabilities);
}

} // namespace joynr
@@ -373,6 +373,7 @@ class LocalCapabilitiesCallback : public ILocalCapabilitiesCallback

private:
DISALLOW_COPY_AND_ASSIGN(LocalCapabilitiesCallback);
std::once_flag _onceFlag;
std::function<void(const std::vector<types::DiscoveryEntryWithMetaInfo>&)> _onSuccess;
std::function<void(const types::DiscoveryError::Enum&)> _onErrorCallback;
};

0 comments on commit 1e0b35d

Please sign in to comment.
You can’t perform that action at this time.