Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 62 additions & 43 deletions hazelcast/src/hazelcast/cp/cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,34 +839,42 @@ namespace hazelcast {
session_id,
thread_id, invocation_uid, permits,
timeout.count());
return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
boost::launch::deferred, [=](boost::future<client::protocol::ClientMessage> f) {
try {
auto acquired = f.get().get_first_fixed_sized_field<bool>();
if (!acquired) {
session_manager_.release_session(group_id_, session_id);
}
// first bool means acquired or not, second bool means if should try again
return std::make_pair(acquired, false);
} catch (client::exception::session_expired &) {
session_manager_.invalidate_session(group_id_, session_id);
if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <=
std::chrono::milliseconds::zero())) {

auto invocation = client::spi::impl::ClientInvocation::create(context_, request, object_name_);
try {
return invocation->invoke().then(
boost::launch::deferred, [=](boost::future<client::protocol::ClientMessage> f) {
try {
auto acquired = f.get().get_first_fixed_sized_field<bool>();
if (!acquired) {
session_manager_.release_session(group_id_, session_id);
}
// first bool means acquired or not, second bool means if should try again
return std::make_pair(acquired, false);
} catch (client::exception::session_expired &) {
session_manager_.invalidate_session(group_id_, session_id);
if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <= std::chrono::milliseconds::zero())) {
return std::make_pair(false, false);
}
return std::make_pair(false, true);
} catch (client::exception::wait_key_cancelled &) {
session_manager_.release_session(group_id_, session_id, permits);
if (!use_timeout) {
BOOST_THROW_EXCEPTION(
client::exception::illegal_state("session_semaphore::try_acquire_for_millis", (boost::format(
"Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
object_name_).str()));
}
return std::make_pair(false, false);
} catch (...) {
session_manager_.release_session(group_id_, session_id, permits);
throw;
}
return std::make_pair(false, true);
} catch (client::exception::wait_key_cancelled &) {
session_manager_.release_session(group_id_, session_id, permits);
if (!use_timeout) {
BOOST_THROW_EXCEPTION(
client::exception::illegal_state(
"session_semaphore::try_acquire_for_millis", (boost::format(
"Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
object_name_).str()));
}
return std::make_pair(false, false);
}
});
});
} catch (...) {
session_manager_.release_session(group_id_, session_id);
throw;
}
});

return do_try_acquire_once().then(boost::launch::deferred, [=] (boost::future<std::pair<bool, bool>> f) {
Expand Down Expand Up @@ -920,24 +928,28 @@ namespace hazelcast {
boost::future<int32_t> session_semaphore::drain_permits() {
auto thread_id = get_thread_id();
auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();

auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
auto request = std::make_shared<client::protocol::ClientMessage>(client::protocol::codec::semaphore_drain_encode(group_id_, object_name_,
session_id,
thread_id, invocation_uid));
auto inv = client::spi::impl::ClientInvocation::create(context_, std::move(request), object_name_);
auto do_drain_once = ([=] () {
auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_,
session_id,
thread_id, invocation_uid);
return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
boost::launch::deferred, [=](boost::future<client::protocol::ClientMessage> f) {
try {
auto count = f.get().get_first_fixed_sized_field<int32_t>();
session_manager_.release_session(group_id_, session_id,
DRAIN_SESSION_ACQ_COUNT - count);
return count;
} catch (client::exception::session_expired &) {
session_manager_.invalidate_session(group_id_, session_id);
return -1;
}
});
try {
return inv->invoke().then(
boost::launch::deferred, [=](boost::future<client::protocol::ClientMessage> f) {
try {
auto count = f.get().get_first_fixed_sized_field<int32_t>();
session_manager_.release_session(group_id_, session_id,DRAIN_SESSION_ACQ_COUNT - count);
return count;
} catch (client::exception::session_expired &) {
session_manager_.invalidate_session(group_id_, session_id);
return -1;
}
});
} catch (...) {
session_manager_.release_session(group_id_, session_id,DRAIN_SESSION_ACQ_COUNT);
throw;
}
});

return do_drain_once().then(boost::launch::deferred, [=] (boost::future<int32_t> f) {
Expand All @@ -947,6 +959,13 @@ namespace hazelcast {
}
while((count = do_drain_once().get()) == -1) {}
return count;
}).then(boost::launch::deferred,[=] (boost::future<int32_t> f) {
try {
return f.get();
} catch (...) {
session_manager_.release_session(group_id_, session_id,DRAIN_SESSION_ACQ_COUNT);
throw;
}
});
}

Expand Down