Skip to content

Commit

Permalink
Fix standalone mcrouter to use virtual event bases with IOThreadPoolE…
Browse files Browse the repository at this point in the history
…xecutor

Summary: Fix breakage in standalone mcrouter from deprecation of CarbonRouterInstance::init API to pass EVBs.

Reviewed By: spalamarchuk

Differential Revision: D24555047

fbshipit-source-id: c94e3215e97267c91f0bd6c902edddae1c5dc78a
  • Loading branch information
stuclar authored and facebook-github-bot committed Oct 27, 2020
1 parent 1e77d94 commit 8dd0851
Showing 1 changed file with 55 additions and 57 deletions.
112 changes: 55 additions & 57 deletions mcrouter/Server-inl.h
Expand Up @@ -162,33 +162,6 @@ inline void startServerShutdown(
}
}

template <class RouterInfo, template <class> class RequestHandler>
void serverLoop(
CarbonRouterInstance<RouterInfo>& router,
size_t threadId,
folly::EventBase& evb,
AsyncMcServerWorker& worker,
const McrouterStandaloneOptions& standaloneOpts,
std::function<void(McServerSession&)>& aclChecker) {
auto routerClient = standaloneOpts.remote_thread
? router.createClient(0 /* maximum_outstanding_requests */)
: router.createSameThreadClient(0 /* maximum_outstanding_requests */);
detail::serverInit<RouterInfo, RequestHandler>(
router,
threadId,
evb,
worker,
standaloneOpts,
aclChecker,
routerClient.get());
/* TODO(libevent): the only reason this is not simply evb.loop() is
because we need to call asox stuff on every loop iteration.
We can clean this up once we convert everything to EventBase */
while (worker.isAlive() || worker.writesPending()) {
evb.loopOnce();
}
}

inline AsyncMcServer::Options createAsyncMcServerOptions(
const McrouterOptions& mcrouterOpts,
const McrouterStandaloneOptions& standaloneOpts,
Expand Down Expand Up @@ -526,29 +499,14 @@ bool runServer(
CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies);
ioThreadPool->removeObserver(executorObserver);

// Create AsyncMcServer instance
AsyncMcServer server(detail::createAsyncMcServerOptions(
mcrouterOpts, standaloneOpts, &evbs));
// Get EVB of main thread
auto localEvb = ioThreadPool->getEventBaseManager()->getEventBase();

server.installShutdownHandler({SIGINT, SIGTERM});
std::shared_ptr<AsyncMcServer> asyncMcServer =
std::make_shared<AsyncMcServer>(detail::createAsyncMcServerOptions(
mcrouterOpts, standaloneOpts, &evbs));

CarbonRouterInstance<RouterInfo>* router = nullptr;

SCOPE_EXIT {
if (router) {
router->shutdown();
}
server.join();

LOG(INFO) << "Shutting down";

freeAllRouters();

if (!opts.unixDomainSockPath.empty()) {
std::remove(opts.unixDomainSockPath.c_str());
}
};

if (standaloneOpts.remote_thread) {
router =
CarbonRouterInstance<RouterInfo>::init("standalone", mcrouterOpts);
Expand All @@ -563,19 +521,59 @@ bool runServer(

setupRouter<RouterInfo>(mcrouterOpts, standaloneOpts, router, preRunCb);

auto shutdownStarted = std::make_shared<std::atomic<bool>>(false);
ShutdownSignalHandler<RouterInfo> shutdownHandler(
localEvb, nullptr, asyncMcServer, shutdownStarted);
shutdownHandler.registerSignalHandler(SIGTERM);
shutdownHandler.registerSignalHandler(SIGINT);

// Create CarbonRouterClients for each worker thread
std::vector<typename CarbonRouterClient<RouterInfo>::Pointer>
carbonRouterClients;
for (size_t i = 0; i < mcrouterOpts.num_proxies; i++) {
// Create CarbonRouterClients
auto routerClient = standaloneOpts.remote_thread
? router->createClient(0 /* maximum_outstanding_requests */)
: router->createSameThreadClient(
0 /* maximum_outstanding_requests */);
carbonRouterClients.push_back(std::move(routerClient));
}
CHECK_EQ(carbonRouterClients.size(), mcrouterOpts.num_proxies);

auto aclChecker = detail::getAclChecker(mcrouterOpts, standaloneOpts);
folly::Baton<> shutdownBaton;
server.spawn(
[router, &standaloneOpts, &aclChecker](
asyncMcServer->startOnVirtualEB(
[&router,
&carbonRouterClients,
&standaloneOpts,
aclChecker = aclChecker](
size_t threadId,
folly::EventBase& evb,
AsyncMcServerWorker& worker) {
detail::serverLoop<RouterInfo, RequestHandler>(
*router, threadId, evb, worker, standaloneOpts, aclChecker);
folly::VirtualEventBase& vevb,
AsyncMcServerWorker& worker) mutable {
detail::serverInit<RouterInfo, RequestHandler>(
*router,
threadId,
vevb.getEventBase(),
worker,
standaloneOpts,
aclChecker,
carbonRouterClients[threadId].get());
},
[&shutdownBaton]() { shutdownBaton.post(); });

shutdownBaton.wait();
[evb = localEvb, &asyncMcServer, &shutdownStarted] {
evb->runInEventBaseThread([&]() {
detail::startServerShutdown<RouterInfo>(
nullptr, asyncMcServer, shutdownStarted);
});
evb->terminateLoopSoon();
});
localEvb->loopForever();
LOG(INFO) << "Started shutdown of CarbonRouterInstance";
router->shutdown();
freeAllRouters();
ioThreadPool.reset();
if (!opts.unixDomainSockPath.empty()) {
std::remove(opts.unixDomainSockPath.c_str());
}
LOG(INFO) << "Completed shutdown";
} catch (const std::exception& e) {
LOG(ERROR) << e.what();
return false;
Expand Down

0 comments on commit 8dd0851

Please sign in to comment.