Skip to content

Commit

Permalink
Add AfterFork for TCPServer and UDPServer. Fix AfterFork problem of E…
Browse files Browse the repository at this point in the history
…ventLoop, see issue #56
  • Loading branch information
zieckey committed Jul 13, 2017
1 parent 64764ec commit 9fc8601
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 11 deletions.
24 changes: 21 additions & 3 deletions evpp/event_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,19 @@ void EventLoop::Init() {

tid_ = std::this_thread::get_id(); // The default thread id

// Initialized task queue watcher
InitNotifyPipeWatcher();

status_.store(kInitialized);
}

void EventLoop::InitNotifyPipeWatcher() {
// Initialized task queue notify pipe watcher
watcher_.reset(new PipeEventWatcher(this, std::bind(&EventLoop::DoPendingFunctors, this)));
int rc = watcher_->Init();
assert(rc);
if (!rc) {
LOG_FATAL << "PipeEventWatcher init failed.";
}

status_.store(kInitialized);
}

void EventLoop::Run() {
Expand Down Expand Up @@ -149,6 +153,20 @@ void EventLoop::AfterFork() {
LOG_FATAL << "event_reinit failed!";
abort();
}

// We create EventLoopThread and initialize it in father process,
// but we use it in child process.
// If we have only one child process, everything goes well.
//
// But if we have multi child processes, something goes wrong.
// Because EventLoop::watcher_ is created and initialized in father process
// all children processes inherited father's pipe.
//
// When we use the pipe to do a notification in one child process
// the notification may be received by another child process randomly.
//
// So we need to reinitialize the watcher_
InitNotifyPipeWatcher();
}

InvokeTimerPtr EventLoop::RunAfter(double delay_ms, const Functor& f) {
Expand Down
3 changes: 2 additions & 1 deletion evpp/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class EVPP_EXPORT EventLoop : public ServerStatus {
// @brief Stop the event loop
void Stop();

// @brief Reinitialize the event_base object after a fork
// @brief Reinitialize some data fields after a fork
void AfterFork();

InvokeTimerPtr RunAfter(double delay_ms, const Functor& f);
Expand Down Expand Up @@ -105,6 +105,7 @@ class EVPP_EXPORT EventLoop : public ServerStatus {
}
private:
void Init();
void InitNotifyPipeWatcher();
void StopInLoop();
void DoPendingFunctors();
size_t GetPendingQueueSize();
Expand Down
4 changes: 4 additions & 0 deletions evpp/tcp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ bool TCPServer::Init() {
return true;
}

void TCPServer::AfterFork() {
// Nothing to do right now.
}

bool TCPServer::Start() {
DLOG_TRACE;
assert(status_ == kInitialized);
Expand Down
3 changes: 3 additions & 0 deletions evpp/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy, public ServerStatus {
// the TCP server is totally stopped
void Stop(DoneCallback cb = DoneCallback());

// @brief Reinitialize some data fields after a fork
void AfterFork();

public:
// Set a connection event relative callback when the TCPServer
// receives a new connection or an exist connection breaks down.
Expand Down
4 changes: 4 additions & 0 deletions evpp/udp/udp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ bool Server::Init(const std::string& listen_ports/*like "53,5353,1053"*/) {
return Init(v);
}

void Server::AfterFork() {
// Nothing to do right now.
}

bool Server::Start() {
if (!message_handler_) {
LOG_ERROR << "MessageHandler DO NOT set!";
Expand Down
3 changes: 3 additions & 0 deletions evpp/udp/udp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy {
void Pause();
void Continue();

// @brief Reinitialize some data fields after a fork
void AfterFork();

bool IsRunning() const;
bool IsStopped() const;

Expand Down
22 changes: 15 additions & 7 deletions test/event_loop_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static void PeriodicFunc() {
}
}

TEST_UNIT(testEventLoop) {
TEST_UNIT(TestEventLoop1) {
using namespace evloop;
std::thread th(MyEventThread);
usleep(delay.Microseconds());
Expand All @@ -50,15 +50,13 @@ TEST_UNIT(testEventLoop) {
H_TEST_ASSERT(evpp::GetActiveEventCount() == 0);
}


namespace {
void OnTimer(evpp::EventLoop* loop) {

}
}


TEST_UNIT(testEventLoop2) {
TEST_UNIT(TestEventLoop2) {
evpp::EventLoop loop;
auto timer = [&loop]() {
auto close = [&loop]() {
Expand All @@ -72,7 +70,7 @@ TEST_UNIT(testEventLoop2) {
}

// Test std::move of C++11
TEST_UNIT(testEventLoop3) {
TEST_UNIT(TestEventLoop3) {
evpp::EventLoop loop;
auto timer = [&loop]() {
auto close = [&loop]() {
Expand Down Expand Up @@ -108,7 +106,7 @@ void NewEventLoop(struct event_base* base) {
}

// Test creating EventLoop from a exist event_base
TEST_UNIT(testEventLoop4) {
TEST_UNIT(TestEventLoop4) {
struct event_base* base = event_base_new();
auto timer = std::make_shared<evpp::TimerEventWatcher>(base, std::bind(&NewEventLoop, base), evpp::Duration(1.0));
timer->Init();
Expand All @@ -125,7 +123,7 @@ TEST_UNIT(testEventLoop4) {


// Test EventLoop::QueueInLoop() before EventLoop::Run()
TEST_UNIT(testEventLoop5) {
TEST_UNIT(TestEventLoop5) {
evpp::EventLoop loop;
auto fn = [&loop]() {
LOG_INFO << "Entering fn";
Expand All @@ -141,6 +139,16 @@ TEST_UNIT(testEventLoop5) {
}


// Test EventLoop's constructor and destructor
TEST_UNIT(TestEventLoop6) {
evpp::EventLoop* loop = new evpp::EventLoop;
LOG_INFO << "loop=" << loop;
delete loop;
}







Expand Down

0 comments on commit 9fc8601

Please sign in to comment.