Skip to content

Commit

Permalink
rgw/sync: add with_lease() with polymorphic LockClient
Browse files Browse the repository at this point in the history
Signed-off-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley authored and adamemerson committed May 15, 2023
1 parent 1f1fb07 commit 1398200
Show file tree
Hide file tree
Showing 5 changed files with 1,046 additions and 0 deletions.
35 changes: 35 additions & 0 deletions src/rgw/sync/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#pragma once

#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/system/error_code.hpp>

namespace rgw::sync {

using error_code = boost::system::error_code;

namespace asio = boost::asio;

using default_executor = asio::strand<asio::io_context::executor_type>;

template <typename T>
using awaitable = asio::awaitable<T, default_executor>;

using use_awaitable_t = asio::use_awaitable_t<default_executor>;
static constexpr use_awaitable_t use_awaitable{};

} // namespace rgw::sync
175 changes: 175 additions & 0 deletions src/rgw/sync/detail/lease_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#pragma once

#include <optional>
#include <utility>
#include <boost/asio/async_result.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include "common/async/co_waiter.h"
#include "common/async/service.h"
#include "common/ceph_time.h"
#include "sync/common.h"

namespace rgw::sync::detail {

using lease_clock = ceph::coarse_mono_clock;
using lease_timer = asio::basic_waitable_timer<lease_clock,
asio::wait_traits<lease_clock>, default_executor>;

// base class for the lease completion state. this contains everything that
// doesn't depend on the coroutine's return type
class lease_state : public ceph::async::service_list_base_hook {
ceph::async::service<lease_state>* svc = nullptr;
lease_timer timer;
asio::cancellation_signal signal;
std::exception_ptr eptr;
ceph::async::co_waiter<void, default_executor> waiter;

public:
lease_state(default_executor ex) : timer(ex) {}
~lease_state()
{
if (svc) {
svc->remove(*this);
}
}

lease_timer& get_timer() { return timer; }

asio::cancellation_slot get_cancellation_slot() { return signal.slot(); }

awaitable<void> wait()
{
if (!svc) {
// register for service_shutdown() notifications
svc = &asio::use_service<ceph::async::service<lease_state>>(
asio::query(co_await asio::this_coro::executor,
asio::execution::context));
svc->add(*this);
}
co_await waiter.get();
}

void complete()
{
if (waiter.waiting()) {
waiter.complete(nullptr);
} else {
timer.cancel(); // wake the renewal loop
}
}

bool aborted() const { return !!eptr; }

void abort(std::exception_ptr e)
{
if (!eptr) { // only the first exception is reported
eptr = e;
cancel();
}
}

void rethrow()
{
if (eptr) {
std::rethrow_exception(eptr);
}
}

void cancel()
{
signal.emit(asio::cancellation_type::terminal);
timer.cancel();
}

void service_shutdown()
{
waiter.shutdown();
}
};

// capture and return the arguments to cr's completion handler
template <typename T>
class lease_completion_state : public lease_state,
public boost::intrusive_ref_counter<lease_completion_state<T>,
boost::thread_unsafe_counter>
{
using result_type = std::pair<std::exception_ptr, T>;
std::optional<result_type> result;
public:
lease_completion_state(default_executor ex) : lease_state(ex) {}

bool completed() const { return result.has_value(); }

auto completion_handler()
{
return asio::bind_cancellation_slot(get_cancellation_slot(),
[self = boost::intrusive_ptr{this}] (std::exception_ptr eptr, T val) {
self->result.emplace(eptr, std::move(val));
self->complete();
});
}

T get() // precondition: completed()
{
rethrow(); // rethrow exceptions from renewal
if (auto eptr = std::get<0>(*result); eptr) {
std::rethrow_exception(eptr);
}
return std::get<1>(std::move(*result));
}
};

// specialization for awaitable<void>
template<>
class lease_completion_state<void> : public lease_state,
public boost::intrusive_ref_counter<lease_completion_state<void>,
boost::thread_unsafe_counter>
{
using result_type = std::exception_ptr;
std::optional<result_type> result;
public:
lease_completion_state(default_executor ex) : lease_state(ex) {}

bool completed() const { return result.has_value(); }

auto completion_handler()
{
return asio::bind_cancellation_slot(get_cancellation_slot(),
[self = boost::intrusive_ptr{this}] (std::exception_ptr eptr) {
self->result = eptr;
self->complete();
});
}

void get() // precondition: completed()
{
rethrow(); // rethrow exceptions from renewal
if (*result) {
std::rethrow_exception(*result);
}
}
};

template <typename T>
auto make_lease_completion_state(default_executor ex)
{
return boost::intrusive_ptr{new lease_completion_state<T>(ex)};
}

} // namespace rgw::sync::detail
125 changes: 125 additions & 0 deletions src/rgw/sync/lease.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#pragma once

#include "include/scope_guard.h"
#include "common/ceph_time.h"
#include "detail/lease_state.h"
#include "common.h"

namespace rgw::sync {

/// \brief Client interface for a specific timed distributed exclusive lock.
class LockClient {
public:
virtual ~LockClient() {}

/// Acquire a timed lock for the given duration, or throw on error.
virtual awaitable<void> acquire(ceph::timespan duration) = 0;
/// Renew an acquired lock for the given duration, or throw on error.
virtual awaitable<void> renew(ceph::timespan duration) = 0;
/// Release an acquired lock, or throw on error.
virtual awaitable<void> release() = 0;
};


/// \brief Call a coroutine under the protection of a continuous lease.
///
/// Acquires exclusive access to a timed lock, then spawns the given coroutine
/// \ref cr. The lock is renewed at intervals of \ref duration / 2. The
/// coroutine is canceled if the lock is lost before its completion.
///
/// Exceptions thrown by release() are ignored, but exceptions from acquire()
/// and renew() propagate back to the caller. If renew() is delayed long
/// enough for the lock to expire, a boost::system::system_error exception is
/// thrown with an error code matching boost::system::errc::timed_out.
///
/// Otherwise, the result of \ref cr is returned to the caller, whether by
/// exception or return value.
///
/// \relates LockClient
///
/// \param lock A client that can send lock requests
/// \param duration Duration of the lock
/// \param cr The coroutine to call under lease
///
/// \tparam T The return type of the coroutine \ref cr
template <typename T>
auto with_lease(LockClient& lock,
ceph::timespan duration,
awaitable<T> cr)
-> awaitable<T>
{
auto ex = co_await asio::this_coro::executor;

// acquire the lock. exceptions propagate directly to the caller
co_await lock.acquire(duration);
auto expires_at = detail::lease_clock::now() + duration;

// allocate the lease state with scoped cancellation so that with_lease()'s
// cancellation triggers cancellation of the spawned coroutine
auto state = detail::make_lease_completion_state<T>(ex);
const auto state_guard = make_scope_guard([&state] { state->cancel(); });

// spawn the coroutine with a waitable/cancelable completion handler
asio::co_spawn(ex, std::move(cr), state->completion_handler());

// lock renewal loop
auto& timer = state->get_timer();
const ceph::timespan interval = duration / 2;

while (!state->aborted() && !state->completed()) {
// sleep until the next lock interval
timer.expires_after(interval);
try {
co_await timer.async_wait(use_awaitable);
} catch (const std::exception&) {
break; // timer canceled by cr's completion, or caller canceled
}

// arm a timeout for the renew request
timer.expires_at(expires_at);
timer.async_wait([state] (error_code ec) {
if (!ec) {
state->abort(std::make_exception_ptr(
boost::system::system_error(
ETIMEDOUT, boost::system::system_category())));
}
});

try {
co_await lock.renew(duration);
expires_at = detail::lease_clock::now() + duration;
} catch (const std::exception&) {
state->abort(std::current_exception());
expires_at = detail::lease_clock::zero(); // don't release below
break;
}
}
timer.cancel();

// if cr was canceled, await its completion before releasing the lock
if (!state->completed()) {
co_await state->wait();
}

// release the lock if it hasn't expired
if (detail::lease_clock::now() < expires_at) try {
co_await lock.release();
} catch (const std::exception&) {} // ignore errors

// return the spawned coroutine's result
co_return state->get();
}

} // namespace rgw::sync
7 changes: 7 additions & 0 deletions src/test/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,17 @@ add_executable(unittest_rgw_putobj test_rgw_putobj.cc)
add_ceph_unittest(unittest_rgw_putobj)
target_link_libraries(unittest_rgw_putobj ${rgw_libs} ${UNITTEST_LIBS})


add_executable(unittest_rgw_throttle test_rgw_throttle.cc)
add_ceph_unittest(unittest_rgw_throttle)
target_link_libraries(unittest_rgw_throttle ${rgw_libs} ${UNITTEST_LIBS})

add_executable(unittest_rgw_sync_lease test_rgw_sync_lease.cc)
target_include_directories(unittest_rgw_sync_lease
PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw")
add_ceph_unittest(unittest_rgw_sync_lease)
target_link_libraries(unittest_rgw_sync_lease ceph-common ${UNITTEST_LIBS})

add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
add_ceph_unittest(unittest_rgw_iam_policy)
target_link_libraries(unittest_rgw_iam_policy
Expand Down
Loading

0 comments on commit 1398200

Please sign in to comment.