diff --git a/hazelcast/src/hazelcast/cp/cp.cpp b/hazelcast/src/hazelcast/cp/cp.cpp index 86d256a1ab..581b7c500f 100644 --- a/hazelcast/src/hazelcast/cp/cp.cpp +++ b/hazelcast/src/hazelcast/cp/cp.cpp @@ -839,34 +839,42 @@ namespace hazelcast { session_id, thread_id, invocation_uid, permits, timeout.count()); - return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then( - boost::launch::deferred, [=](boost::future f) { - try { - auto acquired = f.get().get_first_fixed_sized_field(); - if (!acquired) { - session_manager_.release_session(group_id_, session_id); - } - // first bool means acquired or not, second bool means if should try again - return std::make_pair(acquired, false); - } catch (client::exception::session_expired &) { - session_manager_.invalidate_session(group_id_, session_id); - if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <= - std::chrono::milliseconds::zero())) { + + auto invocation = client::spi::impl::ClientInvocation::create(context_, request, object_name_); + try { + return invocation->invoke().then( + boost::launch::deferred, [=](boost::future f) { + try { + auto acquired = f.get().get_first_fixed_sized_field(); + if (!acquired) { + session_manager_.release_session(group_id_, session_id); + } + // first bool means acquired or not, second bool means if should try again + return std::make_pair(acquired, false); + } catch (client::exception::session_expired &) { + session_manager_.invalidate_session(group_id_, session_id); + if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <= std::chrono::milliseconds::zero())) { + return std::make_pair(false, false); + } + return std::make_pair(false, true); + } catch (client::exception::wait_key_cancelled &) { + session_manager_.release_session(group_id_, session_id, permits); + if (!use_timeout) { + BOOST_THROW_EXCEPTION( + client::exception::illegal_state("session_semaphore::try_acquire_for_millis", (boost::format( + "Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") % + object_name_).str())); + } return std::make_pair(false, false); + } catch (...) { + session_manager_.release_session(group_id_, session_id, permits); + throw; } - return std::make_pair(false, true); - } catch (client::exception::wait_key_cancelled &) { - session_manager_.release_session(group_id_, session_id, permits); - if (!use_timeout) { - BOOST_THROW_EXCEPTION( - client::exception::illegal_state( - "session_semaphore::try_acquire_for_millis", (boost::format( - "Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") % - object_name_).str())); - } - return std::make_pair(false, false); - } - }); + }); + } catch (...) { + session_manager_.release_session(group_id_, session_id); + throw; + } }); return do_try_acquire_once().then(boost::launch::deferred, [=] (boost::future> f) { @@ -920,24 +928,28 @@ namespace hazelcast { boost::future session_semaphore::drain_permits() { auto thread_id = get_thread_id(); auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid(); - + auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT); + auto request = std::make_shared(client::protocol::codec::semaphore_drain_encode(group_id_, object_name_, + session_id, + thread_id, invocation_uid)); + auto inv = client::spi::impl::ClientInvocation::create(context_, std::move(request), object_name_); auto do_drain_once = ([=] () { - auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT); - auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_, - session_id, - thread_id, invocation_uid); - return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then( - boost::launch::deferred, [=](boost::future f) { - try { - auto count = f.get().get_first_fixed_sized_field(); - session_manager_.release_session(group_id_, session_id, - DRAIN_SESSION_ACQ_COUNT - count); - return count; - } catch (client::exception::session_expired &) { - session_manager_.invalidate_session(group_id_, session_id); - return -1; - } - }); + try { + return inv->invoke().then( + boost::launch::deferred, [=](boost::future f) { + try { + auto count = f.get().get_first_fixed_sized_field(); + session_manager_.release_session(group_id_, session_id,DRAIN_SESSION_ACQ_COUNT - count); + return count; + } catch (client::exception::session_expired &) { + session_manager_.invalidate_session(group_id_, session_id); + return -1; + } + }); + } catch (...) { + session_manager_.release_session(group_id_, session_id,DRAIN_SESSION_ACQ_COUNT); + throw; + } }); return do_drain_once().then(boost::launch::deferred, [=] (boost::future f) { @@ -947,6 +959,13 @@ namespace hazelcast { } while((count = do_drain_once().get()) == -1) {} return count; + }).then(boost::launch::deferred,[=] (boost::future f) { + try { + return f.get(); + } catch (...) { + session_manager_.release_session(group_id_, session_id,DRAIN_SESSION_ACQ_COUNT); + throw; + } }); }