Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,13 @@ struct Waiter
Waiter() = default;

template <typename Fn>
void post(Fn&& fn)
bool post(Fn&& fn)
{
const Lock lock(m_mutex);
assert(!m_fn);
if (m_fn) return false;
m_fn = std::forward<Fn>(fn);
m_cv.notify_all();
return true;
}

template <class Predicate>
Expand Down
7 changes: 6 additions & 1 deletion include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.loop->log()
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
server.m_context.loop->log()
<< "IPC server error request #" << req
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
throw std::runtime_error("thread busy");
}
} else {
server.m_context.loop->log()
<< "IPC server error request #" << req << ", missing thread to execute request";
Expand Down
73 changes: 73 additions & 0 deletions test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@
#include <mp/test/foo.capnp.h>
#include <mp/test/foo.capnp.proxy.h>

#include <atomic>
#include <capnp/capability.h>
#include <capnp/rpc.h>
#include <condition_variable>
#include <cstring>
#include <exception>
#include <functional>
#include <future>
#include <iostream>
#include <kj/async.h>
#include <kj/async-io.h>
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/exception.h>
#include <kj/memory.h>
#include <kj/string.h>
#include <kj/test.h>
#include <memory>
#include <mp/proxy.h>
#include "mp/proxy.capnp.h"
#include <mp/proxy-io.h>
#include "mp/util.h"
#include <optional>
#include <set>
#include <stdexcept>
Expand Down Expand Up @@ -297,5 +304,71 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value();
}

KJ_TEST("Make simultaneous IPC callbacks with same request_thread and callback_thread")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
std::promise<void> signal;

foo->initThreadMap();
// Use callFnAsync() to get the client to setup the request_thread
// that will be used for the test.
setup.server->m_impl->m_fn = [&] {};
foo->callFnAsync();
ThreadContext& tc{g_thread_context};
std::optional<Thread::Client> callback_thread, request_thread;
{
Lock lock(tc.waiter->m_mutex);
callback_thread = tc.callback_threads.at(foo->m_context.connection)->m_client;
request_thread = tc.request_threads.at(foo->m_context.connection)->m_client;
}

setup.server->m_impl->m_fn = [&] {
try
{
signal.get_future().get();
}
catch(const std::exception& e)
{
KJ_EXPECT(e.what() == std::string("Future already retrieved"));
}
};

auto client{foo->m_client};
bool caught_thread_busy = false;
// NOTE: '3' was choosen because it was the lowest number
// of simultaneous calls required to reliably catch a "thread busy" error
std::atomic<size_t> running{3};
foo->m_context.loop->sync([&]
{
for (size_t i = 0; i < running; i++)
{
auto request{client.callFnAsyncRequest()};
auto context{request.initContext()};
context.setCallbackThread(*callback_thread);
context.setThread(*request_thread);
foo->m_context.loop->m_task_set->add(request.send().then(
[&](auto&& results) {
running -= 1;
tc.waiter->m_cv.notify_all();
},
[&](kj::Exception&& e) {
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
"remote exception: std::exception: thread busy");
caught_thread_busy = true;
running -= 1;
signal.set_value();
tc.waiter->m_cv.notify_all();
}
));
}
});
{
Lock lock(tc.waiter->m_mutex);
tc.waiter->wait(lock, [&running] { return running == 0; });
}
KJ_EXPECT(caught_thread_busy);
}

} // namespace test
} // namespace mp
Loading