Skip to content

Commit

Permalink
Fix a bug of deadlock when terminating the server.
Browse files Browse the repository at this point in the history
  • Loading branch information
estraier committed Apr 3, 2022
1 parent d010ede commit e9f1c44
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
7 changes: 7 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
2021-11-04 Mikio Hirabayashi <hirarin@gmail.com>

- Release: 0.9.9
- Library version: 1.16.0

- Fix a bug of deadlock when terminating the server.

2021-11-04 Mikio Hirabayashi <hirarin@gmail.com>

- Release: 0.9.8
Expand Down
73 changes: 41 additions & 32 deletions tkrzw_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ std::string_view g_log_level;
std::string_view g_log_date;
int32_t g_log_td = 0;
std::ofstream* g_log_stream = nullptr;
MessageQueue* g_mq = nullptr;
std::atomic<grpc::Server*> g_server(nullptr);
bool g_is_shutdown = false;
SignalBroker g_signal_broker;
std::atomic_bool g_is_shutdown(false);
std::atomic_bool g_is_reconfig(false);

// Configures the logger.
Status ConfigLogger() {
Expand All @@ -106,31 +106,16 @@ Status ConfigLogger() {
return Status(Status::SUCCESS);
}

// Reconfigure the server.
void ReconfigServer(int signum) {
grpc::Server* server = g_server.load();
if (server != nullptr) {
g_logger->LogCat(Logger::LEVEL_INFO, "Reconfiguring by signal: ", signum);
const Status status = ConfigLogger();
if (status != Status::SUCCESS) {
g_logger->LogCat(Logger::LEVEL_ERROR, "ConfigLogger failed: ", status);
}
}
// Handle a reconfiguring signal.
void HandleReconfigSignal(int signum) {
g_is_reconfig.store(true);
g_signal_broker.Send();
}

// Shutdowns the server.
void ShutdownServer(int signum) {
grpc::Server* server = g_server.load();
if (server != nullptr && g_server.compare_exchange_strong(server, nullptr)) {
g_logger->LogCat(Logger::LEVEL_INFO, "Shutting down by signal: ", signum);
const auto deadline = std::chrono::system_clock::now() +
std::chrono::milliseconds(static_cast<int64_t>(g_shutdown_wait * 1000));
g_is_shutdown = true;
if (g_mq != nullptr) {
g_mq->CancelReaders();
}
server->Shutdown(deadline);
}
// Handle a terminating signal.
void HandleTerminatingSignal(int signum) {
g_is_shutdown.store(true);
g_signal_broker.Send();
}

// Makes SSL credentials.
Expand Down Expand Up @@ -278,7 +263,6 @@ static int32_t Process(int32_t argc, const char** args) {
logger.LogCat(Logger::LEVEL_ERROR, "Open failed: ", ulog_prefix);
has_error = true;
}
g_mq = mq.get();
}
std::vector<std::unique_ptr<ParamDBM>> dbms;
dbms.reserve(dbm_exprs.size());
Expand Down Expand Up @@ -356,11 +340,35 @@ static int32_t Process(int32_t argc, const char** args) {
logger.LogCat(Logger::LEVEL_FATAL, "ServerBuilder::BuildAndStart failed: ", address);
has_error = true;
} else {
g_server.store(server.get());
std::signal(SIGHUP, ReconfigServer);
std::signal(SIGINT, ShutdownServer);
std::signal(SIGTERM, ShutdownServer);
std::signal(SIGQUIT, ShutdownServer);
auto signal_handler = [&]() {
SignalBroker::Waiter waiter(&g_signal_broker);
while (true) {
waiter.Wait(5.0);
if (g_is_shutdown.load()) {
logger.Log(Logger::LEVEL_INFO, "Terminating by signal");
const auto deadline = std::chrono::system_clock::now() +
std::chrono::milliseconds(static_cast<int64_t>(g_shutdown_wait * 1000));
if (mq.get() != nullptr) {
mq->CancelReaders();
}
server->Shutdown(deadline);
break;
}
if (g_is_reconfig.load()) {
logger.Log(Logger::LEVEL_INFO, "Reconfiguring by signal");
const Status status = ConfigLogger();
if (status != Status::SUCCESS) {
logger.LogCat(Logger::LEVEL_ERROR, "ConfigLogger failed: ", status);
}
g_is_reconfig.store(false);
}
}
};
std::thread signal_handler_thread(signal_handler);
std::signal(SIGHUP, HandleReconfigSignal);
std::signal(SIGINT, HandleTerminatingSignal);
std::signal(SIGTERM, HandleTerminatingSignal);
std::signal(SIGQUIT, HandleTerminatingSignal);
if (with_async) {
auto* async_service = (DBMAsyncServiceImpl*)service.get();
async_service->StartReplication();
Expand All @@ -386,6 +394,7 @@ static int32_t Process(int32_t argc, const char** args) {
sync_service->StopReplication();
}
logger.Log(Logger::LEVEL_INFO, "The server finished");
signal_handler_thread.join();
}
service.reset(nullptr);
for (auto& dbm : dbms) {
Expand Down
8 changes: 4 additions & 4 deletions tkrzw_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ class DBMAsyncServiceImpl : public DBMServiceBase, public tkrzw_rpc::DBMService:
const ReplicationParameters& repl_params = {})
: DBMServiceBase(dbms, logger, server_id, mq, repl_params) {}

void OperateQueue(grpc::ServerCompletionQueue* queue, const bool* is_shutdown);
void OperateQueue(grpc::ServerCompletionQueue* queue, const std::atomic_bool* is_shutdown);
void ShutdownQueue(grpc::ServerCompletionQueue* queue);
};

Expand Down Expand Up @@ -1928,7 +1928,7 @@ class AsyncDBMProcessorReplicate : public AsyncDBMProcessorInterface {
};

inline void DBMAsyncServiceImpl::OperateQueue(
grpc::ServerCompletionQueue* queue, const bool* is_shutdown) {
grpc::ServerCompletionQueue* queue, const std::atomic_bool* is_shutdown) {
logger_->Log(Logger::LEVEL_INFO, "Starting a completion queue");
new AsyncDBMProcessor<tkrzw_rpc::EchoRequest, tkrzw_rpc::EchoResponse>(
this, queue, &DBMAsyncServiceImpl::RequestEcho,
Expand Down Expand Up @@ -2018,9 +2018,9 @@ inline void DBMAsyncServiceImpl::OperateQueue(
}
} else {
if (proc != nullptr) {
proc->Cancel(*is_shutdown);
proc->Cancel(is_shutdown->load());
}
if (*is_shutdown) {
if (is_shutdown->load()) {
break;
}
}
Expand Down

0 comments on commit e9f1c44

Please sign in to comment.