diff --git a/include/derecho/core/detail/remote_invocable.hpp b/include/derecho/core/detail/remote_invocable.hpp index 4029ac15..cc61fea2 100644 --- a/include/derecho/core/detail/remote_invocable.hpp +++ b/include/derecho/core/detail/remote_invocable.hpp @@ -53,10 +53,19 @@ struct RemoteInvoker> { return nullptr; } - //Maps invocation-instance IDs to results sets - std::map> results_map; + /* Maps invocation-instance IDs to results sets + Weijia: Although the maximum pending RPC is controlled by the window + size option in the configuration, the user application may not retrieve + the results on time. Therefore, the results may piled up and we need more + slots for the results than window size. We hardwired this to 4K here. + TODO: Obviously, this is not space efficient. Let's find a better RPC + return value mechanism. + */ + + #define MAX_CONCURRENT_RPCS_PER_INVOKER (4096) + PendingResults results_vector[MAX_CONCURRENT_RPCS_PER_INVOKER]; std::atomic invocation_id_sequencer; - std::mutex map_lock; + // std::mutex map_lock; - we don't need a lock on the result map anymore. using lock_t = std::unique_lock; /* use this from within a derived class to retrieve precisely this RemoteInvoker @@ -106,6 +115,7 @@ struct RemoteInvoker> { const std::decay_t&... remote_args) { // auto invocation_id = mutils::long_rand(); std::size_t invocation_id = invocation_id_sequencer++; + invocation_id %= MAX_CONCURRENT_RPCS_PER_INVOKER; std::size_t size = mutils::bytes_size(invocation_id); { auto t = {std::size_t{0}, std::size_t{0}, mutils::bytes_size(remote_args)...}; @@ -118,12 +128,12 @@ struct RemoteInvoker> { assert_always(check_size == size); } - lock_t l{map_lock}; - // default-initialize the maps - results_map.erase(invocation_id); // TODO:release it as soon as possible - PendingResults& pending_results = results_map[invocation_id]; + // lock_t l{map_lock}; + dbg_default_info("About to prepare results_vector entry for RPC call message with invocation ID {}", invocation_id); + results_vector[invocation_id].reset(); + PendingResults& pending_results = results_vector[invocation_id]; - dbg_default_trace("Ready to send an RPC call message with invocation ID {}", invocation_id); + dbg_default_info("Ready to send an RPC call message with invocation ID {}", invocation_id); return send_return{size, serialized_args, pending_results.get_future(), pending_results}; } @@ -143,8 +153,7 @@ struct RemoteInvoker> { //note: find where this exception is set on the sending side! bool is_exception = response[0]; long int invocation_id = ((long int*)(response + 1))[0]; - lock_t l{map_lock}; - assert(results_map.count(invocation_id) != 0); + // lock_t l{map_lock}; // we unlock the map here to avoid the deadlock: // The p2p handler thread, on receiving an RPC REPLY may get this lock // before sst_detect thread finish handling the corresponding ordered @@ -163,13 +172,19 @@ struct RemoteInvoker> { // because we have 64K slots and we assume after 64K messages, this // corresponding ordered_send has been finished already. // TODO: make a better plan along with garbage collection. - l.unlock(); + // l.unlock(); // TODO: garbage collection for the responses. + // More on May-7th (Weijia): + // We don't need to lock the results container (now results_vector) + // anymore. The vector is initialized at the beginning. The p2p rpc + // reuse the slot by reseting it. No deleting, insertion, or + // replacement operations at all. But we still need a better garbage + // collection mechanism. if(is_exception) { - results_map.at(invocation_id).set_exception(nid, std::make_exception_ptr(remote_exception_occurred{nid})); + results_vector[invocation_id].set_exception(nid, std::make_exception_ptr(remote_exception_occurred{nid})); } else { - dbg_default_trace("Received an RPC response for invocation ID {} from node {}", invocation_id, nid); - results_map.at(invocation_id).set_value(nid, *mutils::from_bytes(dsm, response + 1 + sizeof(invocation_id))); + dbg_default_info("Received an RPC response for invocation ID {} from node {}", invocation_id, nid); + results_vector[invocation_id].set_value(nid, *mutils::from_bytes(dsm, response + 1 + sizeof(invocation_id))); } return recv_ret{Opcode(), 0, nullptr, nullptr}; } @@ -211,10 +226,10 @@ struct RemoteInvoker> { * @param invocation_id The ID referring to a particular invocation of the function * @param who The list of nodes that will service this RPC call */ - inline void fulfill_pending_results_map(long int invocation_id, const node_list_t& who) { + inline void fulfill_pending_results_vector(long int invocation_id, const node_list_t& who) { // I think this function is never called assert_always(false); - results_map.at(invocation_id).fulfill_map(who); + results_vector[invocation_id].fulfill_map(who); } /** @@ -285,7 +300,7 @@ struct RemoteInvocable> { out[0] = false; ((long int*)(out + 1))[0] = invocation_id; mutils::to_bytes(result, out + sizeof(invocation_id) + 1); - dbg_default_trace("Ready to send an RPC reply for invocation ID {} to node {}", invocation_id, caller); + dbg_default_info("Ready to send an RPC reply for invocation ID {} to node {}", invocation_id, caller); return recv_ret{reply_opcode, result_size, out, nullptr}; } catch(...) { char* out = out_alloc(sizeof(long int) + 1); diff --git a/include/derecho/core/detail/rpc_utils.hpp b/include/derecho/core/detail/rpc_utils.hpp index 9f426d08..dc9a776f 100644 --- a/include/derecho/core/detail/rpc_utils.hpp +++ b/include/derecho/core/detail/rpc_utils.hpp @@ -337,6 +337,7 @@ class PendingBase { virtual void set_exception_for_removed_node(const node_id_t&) = 0; virtual void set_exception_for_caller_removed() = 0; virtual bool all_responded() const = 0; + virtual void reset() = 0; virtual ~PendingBase() {} }; @@ -475,6 +476,20 @@ class PendingResults : public PendingBase { bool all_responded() const { return map_fulfilled && (responded_nodes == dest_nodes); } + + /** + * reset this object. + */ + void reset() { + promise_for_pending_map = std::promise>>(); + promise_for_reply_promises = std::promise>>(); + reply_promises_are_ready = promise_for_reply_promises.get_future(); + // reply_promises_are_ready_mutex + reply_promises.clear(); + map_fulfilled = false; + dest_nodes.clear(); + responded_nodes.clear(); + } }; /** @@ -515,6 +530,11 @@ class PendingResults : public PendingBase { bool all_responded() const { return map_fulfilled; } + + void reset() { + promise_for_pending_map = std::promise>>(); + map_fulfilled = false; + } }; /**