Skip to content

Commit

Permalink
Fix a race condtion bug of http::Server::Stop. See #27
Browse files Browse the repository at this point in the history
  • Loading branch information
zieckey committed Apr 10, 2017
1 parent a421752 commit 55b331f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
24 changes: 20 additions & 4 deletions evpp/http/http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bool Server::Init(int listen_port) {
bool Server::Init(const std::vector<int>& listen_ports) {
bool rc = true;
for (auto lp : listen_ports) {
rc &= Init(lp);
rc = rc && Init(lp);
}
return rc;
}
Expand Down Expand Up @@ -68,10 +68,15 @@ bool Server::AfterFork() {
}

bool Server::Start() {
std::shared_ptr<std::atomic<int>> exited_listen_thread_count(new std::atomic<int>(0));
bool rc = tpool_->Start(true);
for (auto& lt : listen_threads_) {
auto http_close_fn = std::bind(&Service::Stop, lt.hserver);
rc &= lt.thread->Start(true,
auto http_close_fn = [=]() {
lt.hserver->Stop();
LOG_INFO << "http service at 0.0.0.0:" << lt.hserver->port() << " has stopped.";
OnListenThreadExited(exited_listen_thread_count->fetch_add(1) + 1);
};
rc = rc && lt.thread->Start(true,
EventLoopThread::Functor(),
http_close_fn);
assert(lt.thread->IsRunning());
Expand Down Expand Up @@ -102,12 +107,15 @@ bool Server::Start() {

void Server::Stop(bool wait_thread_exit /*= false*/) {
LOG_INFO << "this=" << this << " http server is stopping";

// First we stop all the listening threads
// And then after listening threads have stopped,
// Server::OnListenThreadExited will be invoked, in which we will stop the working thread pool.
for (auto& lt : listen_threads_) {
// 1. Service::Stop will be called automatically when listen_thread_ is existing
// 2. EventLoopThread::Stop will be called to terminate the thread
lt.thread->Stop();
}
tpool_->Stop();

if (!wait_thread_exit) {
return;
Expand Down Expand Up @@ -261,6 +269,14 @@ EventLoop* Server::GetNextLoop(EventLoop* default_loop, const ContextPtr& ctx) {
#endif
}

void Server::OnListenThreadExited(int exited_listen_thread_count) {
LOG_INFO << "this=" << this << " OnListenThreadExited exited_listen_thread_count=" << exited_listen_thread_count << " listen_threads_.size=" << listen_threads_.size();
if (exited_listen_thread_count == int(listen_threads_.size())) {
LOG_INFO << "this=" << this << " stop the working thread pool.";
tpool_->Stop();
}
}

Service* Server::service(int index) const {
if (index < int(listen_threads_.size())) {
return listen_threads_[index].hserver.get();
Expand Down
4 changes: 3 additions & 1 deletion evpp/http/http_server.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <atomic>

#include "service.h"
#include "evpp/thread_dispatch_policy.h"

Expand Down Expand Up @@ -57,7 +59,7 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy {
const HTTPRequestCallback& user_callback);

EventLoop* GetNextLoop(EventLoop* default_loop, const ContextPtr& ctx);

void OnListenThreadExited(int exited_listen_thread_count);
private:
struct ListenThread {
// The listening main thread
Expand Down
17 changes: 12 additions & 5 deletions evpp/http/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ Service::~Service() {
assert(!evhttp_bound_socket_);
}

bool Service::Listen(int port) {
bool Service::Listen(int listen_port) {
assert(evhttp_);
assert(listen_loop_->IsInLoopThread());
port_ = listen_port;

#if LIBEVENT_VERSION_NUMBER >= 0x02001500
evhttp_bound_socket_ = evhttp_bind_socket_with_handle(evhttp_, "0.0.0.0", port);
evhttp_bound_socket_ = evhttp_bind_socket_with_handle(evhttp_, "0.0.0.0", listen_port);
if (!evhttp_bound_socket_) {
return false;
}
#else
if (evhttp_bind_socket(evhttp_, "0.0.0.0", port) != 0) {
if (evhttp_bind_socket(evhttp_, "0.0.0.0", listen_port) != 0) {
return false;
}
#endif
Expand Down Expand Up @@ -162,7 +163,7 @@ void Service::SendReply(struct evhttp_request* req, const std::string& response_
auto f = [this, response]() {
// In the main HTTP listening thread
assert(listen_loop_->IsInLoopThread());
LOG_TRACE << "this=" << this << " send reply in listenning thread";
LOG_TRACE << "this=" << this << " send reply in listening thread";

if (!response->buffer) {
evhttp_send_reply(response->req, HTTP_NOTFOUND, "Not Found", nullptr);
Expand All @@ -173,7 +174,13 @@ void Service::SendReply(struct evhttp_request* req, const std::string& response_
};

// Forward this response sending task to HTTP listening thread
listen_loop_->RunInLoop(f);
if (listen_loop_->IsRunning()) {
LOG_INFO << "this=" << this << " dispatch this SendReply to listening thread";
listen_loop_->RunInLoop(f);
} else {
LOG_WARN << "this=" << this << " listening thread is going to stop. we discards this request.";
// TODO do we need do some resource recycling about the evhttp_request?
}
}
}
}
5 changes: 5 additions & 0 deletions evpp/http/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ class EVPP_EXPORT Service {
EventLoop* event_loop() const {
return listen_loop_;
}

int port() const {
return port_;
}
private:
static void GenericCallback(struct evhttp_request* req, void* arg);
void HandleRequest(struct evhttp_request* req);
void DefaultHandleRequest(const ContextPtr& ctx);
void SendReply(struct evhttp_request* req, const std::string& response);
private:
int port_ = 0;
struct evhttp* evhttp_;
struct evhttp_bound_socket* evhttp_bound_socket_;
EventLoop* listen_loop_;
Expand Down

0 comments on commit 55b331f

Please sign in to comment.