Skip to content

Commit

Permalink
Merge pull request #690 from dsaveliev/fix-node-stop-rpc-call
Browse files Browse the repository at this point in the history
Ignore disconnects when stopping node - fixes #367
This is a port of bitcoin/bitcoin#14670

This problem is related to the "stop" RPC call.
It turns out that sometimes the node stops before it sends the response.
That causes the client to retry the call and lead to "ConnectionRefusedError".

I found out that there is a fix in bitcoin repo (in master branch, actually), related to this bug.
In the nutshell, this changes removes forced exit of libevent loop, which allows
to process all the requests gracefully.

Test feature_shutdown.py wasn't ported because it introduces a bug bitcoin/bitcoin#14670 (comment)
The "improvement" bitcoin/bitcoin#14958 requires RPC method "getrpcinfo" which isn't ported from the bitcoin codebase yet.

There are other floating tests with a similar error (ConnectionRefusedError):
fixes #373
fixes #484
fixes #544

Signed-off-by: Dmitry Saveliev <dima@thirdhash.com>
  • Loading branch information
scravy committed Feb 28, 2019
2 parents 65d2dcc + 5f973f8 commit 6753dac
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 29 deletions.
32 changes: 11 additions & 21 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <utilstrencodings.h>
#include <netbase.h>
#include <rpc/protocol.h> // For HTTP status codes
#include <init.h>
#include <sync.h>
#include <ui_interface.h>

Expand All @@ -21,7 +22,6 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <future>

#include <event2/thread.h>
#include <event2/buffer.h>
Expand Down Expand Up @@ -420,17 +420,14 @@ bool UpdateHTTPServerLogging(bool enable) {
}

std::thread threadHTTP;
std::future<bool> threadResult;
static std::vector<std::thread> g_thread_http_workers;

bool StartHTTPServer()
{
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
std::packaged_task<bool(event_base*, evhttp*)> task(ThreadHTTP);
threadResult = task.get_future();
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
threadHTTP = std::thread(ThreadHTTP, eventBase, eventHTTP);

for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
Expand All @@ -442,10 +439,6 @@ void InterruptHTTPServer()
{
LogPrint(BCLog::HTTP, "Interrupting HTTP server\n");
if (eventHTTP) {
// Unlisten sockets
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
Expand All @@ -465,20 +458,14 @@ void StopHTTPServer()
delete workQueue;
workQueue = nullptr;
}
// Unlisten sockets, these are what make the event loop running, which means
// that after this and all connections are closed the event loop will quit.
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
boundSockets.clear();
if (eventBase) {
LogPrint(BCLog::HTTP, "Waiting for HTTP event thread to exit\n");
// Exit the event loop as soon as there are no active events.
event_base_loopexit(eventBase, nullptr);
// Give event loop a few seconds to exit (to send back last RPC responses), then break it
// Before this was solved with event_base_loopexit, but that didn't work as expected in
// at least libevent 2.0.21 and always introduced a delay. In libevent
// master that appears to be solved, so in the future that solution
// could be used again (if desirable).
// (see discussion in https://github.com/unite/unite/pull/6990)
if (threadResult.valid() && threadResult.wait_for(std::chrono::milliseconds(2000)) == std::future_status::timeout) {
LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n");
event_base_loopbreak(eventBase);
}
threadHTTP.join();
}
if (eventHTTP) {
Expand Down Expand Up @@ -583,6 +570,9 @@ void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value)
void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
{
assert(!replySent && req);
if (ShutdownRequested()) {
WriteHeader("Connection", "close");
}
// Send event to main http thread to send reply message
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
assert(evb);
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/parameter_conversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ static const CRPCConvertParam vRPCConvertParams[] =

{ "stakeat", 0, "recipient" },
{ "stakeat", 2, "coincontrol" },

{ "stop", 0, "wait" },
};

class CRPCConvertTable
Expand Down
9 changes: 7 additions & 2 deletions src/rpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,19 @@ UniValue help(const JSONRPCRequest& jsonRequest)

UniValue stop(const JSONRPCRequest& jsonRequest)
{
// Accept the deprecated and ignored 'detach' boolean argument
// Accept the hidden 'wait' integer argument (milliseconds)
// For instance, 'stop 1000' makes the call wait 1 second before returning
// to the client (intended for testing)
if (jsonRequest.fHelp || jsonRequest.params.size() > 1)
throw std::runtime_error(
"stop\n"
"\nStop UnitE server.");
// Event loop will exit after current HTTP requests have been handled, so
// this reply will get back to the client.
StartShutdown();
if (jsonRequest.params[0].isNum()) {
MilliSleep(jsonRequest.params[0].get_int());
}
return "UnitE server stopping";
}

Expand Down Expand Up @@ -264,7 +269,7 @@ static const CRPCCommand vRPCCommands[] =
// --------------------- ------------------------ ----------------------- ----------
/* Overall control/query calls */
{ "control", "help", &help, {"command"} },
{ "control", "stop", &stop, {} },
{ "control", "stop", &stop, {"wait"} },
{ "control", "uptime", &uptime, {} },
};

Expand Down
8 changes: 4 additions & 4 deletions test/functional/test_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,16 @@ def start_nodes(self, extra_args=None, *args, **kwargs):
for node in self.nodes:
coverage.write_all_rpc_commands(self.options.coveragedir, node.rpc)

def stop_node(self, i):
def stop_node(self, i, expected_stderr='', wait=0):
"""Stop a united test node"""
self.nodes[i].stop_node()
self.nodes[i].stop_node(expected_stderr, wait=wait)
self.nodes[i].wait_until_stopped()

def stop_nodes(self):
def stop_nodes(self, wait=0):
"""Stop multiple united test nodes"""
for node in self.nodes:
# Issue RPC to stop nodes
node.stop_node()
node.stop_node(wait=wait)

for node in self.nodes:
# Wait for nodes to stop
Expand Down
4 changes: 2 additions & 2 deletions test/functional/test_framework/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ def drain_main_signal_callbacks_pending(self):
else:
queue_size = left

def stop_node(self):
def stop_node(self, expected_stderr='', wait=0):
"""Stop the node."""
if not self.running:
return
self.log.debug("Stopping node")
try:
self.drain_main_signal_callbacks_pending()
self.stop()
self.stop(wait=wait)
except http.client.CannotSendRequest:
self.log.exception("Unable to stop node.")
del self.p2ps[:]
Expand Down

0 comments on commit 6753dac

Please sign in to comment.