Skip to content

Commit

Permalink
Change RemoteInvoker::results_map to RemoveInvoker::results_vector
Browse files Browse the repository at this point in the history
Now results_vector is initialized up front. Its size is hardwired to 4096. 'p2p_send' is going to reset the PendingResult slot before sending an RPC.
  • Loading branch information
songweijia committed May 7, 2019
1 parent 530aa22 commit 6b7c1ac
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
49 changes: 32 additions & 17 deletions include/derecho/core/detail/remote_invocable.hpp
Expand Up @@ -53,10 +53,19 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
return nullptr;
}

//Maps invocation-instance IDs to results sets
std::map<std::size_t, PendingResults<Ret>> results_map;
/* Maps invocation-instance IDs to results sets
Weijia: Although the maximum pending RPC is controlled by the window
size option in the configuration, the user application may not retrieve
the results on time. Therefore, the results may piled up and we need more
slots for the results than window size. We hardwired this to 4K here.
TODO: Obviously, this is not space efficient. Let's find a better RPC
return value mechanism.
*/

#define MAX_CONCURRENT_RPCS_PER_INVOKER (4096)
PendingResults<Ret> results_vector[MAX_CONCURRENT_RPCS_PER_INVOKER];
std::atomic<unsigned short> invocation_id_sequencer;
std::mutex map_lock;
// std::mutex map_lock; - we don't need a lock on the result map anymore.
using lock_t = std::unique_lock<std::mutex>;

/* use this from within a derived class to retrieve precisely this RemoteInvoker
Expand Down Expand Up @@ -106,6 +115,7 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
const std::decay_t<Args>&... remote_args) {
// auto invocation_id = mutils::long_rand();
std::size_t invocation_id = invocation_id_sequencer++;
invocation_id %= MAX_CONCURRENT_RPCS_PER_INVOKER;
std::size_t size = mutils::bytes_size(invocation_id);
{
auto t = {std::size_t{0}, std::size_t{0}, mutils::bytes_size(remote_args)...};
Expand All @@ -118,12 +128,12 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
assert_always(check_size == size);
}

lock_t l{map_lock};
// default-initialize the maps
results_map.erase(invocation_id); // TODO:release it as soon as possible
PendingResults<Ret>& pending_results = results_map[invocation_id];
// lock_t l{map_lock};
dbg_default_info("About to prepare results_vector entry for RPC call message with invocation ID {}", invocation_id);
results_vector[invocation_id].reset();
PendingResults<Ret>& pending_results = results_vector[invocation_id];

dbg_default_trace("Ready to send an RPC call message with invocation ID {}", invocation_id);
dbg_default_info("Ready to send an RPC call message with invocation ID {}", invocation_id);
return send_return{size, serialized_args, pending_results.get_future(),
pending_results};
}
Expand All @@ -143,8 +153,7 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
//note: find where this exception is set on the sending side!
bool is_exception = response[0];
long int invocation_id = ((long int*)(response + 1))[0];
lock_t l{map_lock};
assert(results_map.count(invocation_id) != 0);
// lock_t l{map_lock};
// we unlock the map here to avoid the deadlock:
// The p2p handler thread, on receiving an RPC REPLY may get this lock
// before sst_detect thread finish handling the corresponding ordered
Expand All @@ -163,13 +172,19 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
// because we have 64K slots and we assume after 64K messages, this
// corresponding ordered_send has been finished already.
// TODO: make a better plan along with garbage collection.
l.unlock();
// l.unlock();
// TODO: garbage collection for the responses.
// More on May-7th (Weijia):
// We don't need to lock the results container (now results_vector)
// anymore. The vector is initialized at the beginning. The p2p rpc
// reuse the slot by reseting it. No deleting, insertion, or
// replacement operations at all. But we still need a better garbage
// collection mechanism.
if(is_exception) {
results_map.at(invocation_id).set_exception(nid, std::make_exception_ptr(remote_exception_occurred{nid}));
results_vector[invocation_id].set_exception(nid, std::make_exception_ptr(remote_exception_occurred{nid}));
} else {
dbg_default_trace("Received an RPC response for invocation ID {} from node {}", invocation_id, nid);
results_map.at(invocation_id).set_value(nid, *mutils::from_bytes<Ret>(dsm, response + 1 + sizeof(invocation_id)));
dbg_default_info("Received an RPC response for invocation ID {} from node {}", invocation_id, nid);
results_vector[invocation_id].set_value(nid, *mutils::from_bytes<Ret>(dsm, response + 1 + sizeof(invocation_id)));
}
return recv_ret{Opcode(), 0, nullptr, nullptr};
}
Expand Down Expand Up @@ -211,10 +226,10 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
* @param invocation_id The ID referring to a particular invocation of the function
* @param who The list of nodes that will service this RPC call
*/
inline void fulfill_pending_results_map(long int invocation_id, const node_list_t& who) {
inline void fulfill_pending_results_vector(long int invocation_id, const node_list_t& who) {
// I think this function is never called
assert_always(false);
results_map.at(invocation_id).fulfill_map(who);
results_vector[invocation_id].fulfill_map(who);
}

/**
Expand Down Expand Up @@ -285,7 +300,7 @@ struct RemoteInvocable<Tag, std::function<Ret(Args...)>> {
out[0] = false;
((long int*)(out + 1))[0] = invocation_id;
mutils::to_bytes(result, out + sizeof(invocation_id) + 1);
dbg_default_trace("Ready to send an RPC reply for invocation ID {} to node {}", invocation_id, caller);
dbg_default_info("Ready to send an RPC reply for invocation ID {} to node {}", invocation_id, caller);
return recv_ret{reply_opcode, result_size, out, nullptr};
} catch(...) {
char* out = out_alloc(sizeof(long int) + 1);
Expand Down
20 changes: 20 additions & 0 deletions include/derecho/core/detail/rpc_utils.hpp
Expand Up @@ -337,6 +337,7 @@ class PendingBase {
virtual void set_exception_for_removed_node(const node_id_t&) = 0;
virtual void set_exception_for_caller_removed() = 0;
virtual bool all_responded() const = 0;
virtual void reset() = 0;
virtual ~PendingBase() {}
};

Expand Down Expand Up @@ -475,6 +476,20 @@ class PendingResults : public PendingBase {
bool all_responded() const {
return map_fulfilled && (responded_nodes == dest_nodes);
}

/**
* reset this object.
*/
void reset() {
promise_for_pending_map = std::promise<std::unique_ptr<reply_map<Ret>>>();
promise_for_reply_promises = std::promise<std::map<node_id_t, std::promise<Ret>>>();
reply_promises_are_ready = promise_for_reply_promises.get_future();
// reply_promises_are_ready_mutex
reply_promises.clear();
map_fulfilled = false;
dest_nodes.clear();
responded_nodes.clear();
}
};

/**
Expand Down Expand Up @@ -515,6 +530,11 @@ class PendingResults<void> : public PendingBase {
bool all_responded() const {
return map_fulfilled;
}

void reset() {
promise_for_pending_map = std::promise<std::unique_ptr<std::set<node_id_t>>>();
map_fulfilled = false;
}
};

/**
Expand Down

0 comments on commit 6b7c1ac

Please sign in to comment.