Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Add signal handling to stream manager #2950

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 56 additions & 3 deletions heron/common/src/cpp/network/event_loop_impl.cpp
Expand Up @@ -37,6 +37,12 @@ void EventLoopImpl::eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void
el->handleWriteCallback(fd, event);
}

// 'C' style callback for libevent on signal events
void EventLoopImpl::eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg) {
auto* el = reinterpret_cast<EventLoopImpl*>(arg);
el->handleSignalCallback(sig, event);
}

// 'C' style callback for libevent on timer events
void EventLoopImpl::eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg) {
// TODO(vikasr): this needs to change to VCallback
Expand Down Expand Up @@ -83,6 +89,40 @@ void EventLoopImpl::loop() {

int EventLoopImpl::loopExit() { return event_base_loopbreak(mDispatcher); }

int EventLoopImpl::registerSignal(int sig, VCallback<EventLoop::Status> cb) {
// Create the appropriate structures and init them.
auto* event = new SS_RegisteredEvent<sp_int32>(sig, false, std::move(cb), -1);
evsignal_set(event->event(), sig, &EventLoopImpl::eventLoopImplSignalCallback, this);
if (event_base_set(mDispatcher, event->event()) < 0) {
delete event;
throw heron::error::Error_Exception(errno);
}

// Now add it to the list of signals monitored by the mDispatcher
if (event_add(event->event(), NULL) < 0) {
delete event;
throw heron::error::Error_Exception(errno);
}
mSignalEvents[sig] = event;
return 0;
}

int EventLoopImpl::unRegisterSignal(int sig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this function hooked up?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not actually used yet. I included it, because other events also have a unregister function and it might be useful sometime in the future.

I can also remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk. It is ok to leave it there to me. Let's see if @srkukarni has any input.

if (mSignalEvents.find(sig) == mSignalEvents.end()) {
// This signal wasn't registed. Hence we can't unregister it.
return -1;
}

// Delete the underlying event in libevent
if (event_del(mSignalEvents[sig]->event()) != 0) {
throw heron::error::Error_Exception(errno);
}
auto event = mSignalEvents[sig];
mSignalEvents.erase(sig);
delete event;
return 0;
}

int EventLoopImpl::registerForRead(int fd, VCallback<EventLoop::Status> cb, bool persistent) {
return registerForRead(fd, std::move(cb), persistent, -1);
}
Expand Down Expand Up @@ -136,8 +176,9 @@ int EventLoopImpl::unRegisterForRead(int fd) {
// cout << "event_del failed for fd " << fd;
throw heron::error::Error_Exception(errno);
}
delete mReadEvents[fd];
auto event = mReadEvents[fd];
mReadEvents.erase(fd);
delete event;
return 0;
}

Expand Down Expand Up @@ -195,8 +236,9 @@ int EventLoopImpl::unRegisterForWrite(int fd) {
// cout << "event_del failed for fd " << fd;
throw heron::error::Error_Exception(errno);
}
delete mWriteEvents[fd];
auto event = mWriteEvents[fd];
mWriteEvents.erase(fd);
delete event;
return 0;
}

Expand Down Expand Up @@ -251,8 +293,9 @@ sp_int32 EventLoopImpl::unRegisterTimer(sp_int64 timerId) {
// cout << "event_del failed for timer " << timerId;
throw heron::error::Error_Exception(errno);
}
delete mTimerEvents[timerId];
auto event = mTimerEvents[timerId];
mTimerEvents.erase(timerId);
delete event;
return 0;
}

Expand Down Expand Up @@ -373,6 +416,16 @@ void EventLoopImpl::handleTimerCallback(sp_int16 event, sp_int64 timerId) {
}
}

void EventLoopImpl::handleSignalCallback(sp_int32 sig, sp_int16 event) {
if (mSignalEvents.find(sig) == mSignalEvents.end()) {
// This is possible when unRegisterSignal has been called before we handle this event
// Just ignore this event.
return;
}
auto* registeredEvent = mSignalEvents[sig];
registeredEvent->get_callback()(mapStatusCode(event));
}

EventLoop::Status EventLoopImpl::mapStatusCode(sp_int16 event) {
switch (event) {
case EV_READ:
Expand Down
9 changes: 9 additions & 0 deletions heron/common/src/cpp/network/event_loop_impl.h
Expand Up @@ -44,6 +44,8 @@ class EventLoopImpl : public EventLoop {
// Methods inherited from EventLoop.
virtual void loop();
virtual sp_int32 loopExit();
virtual sp_int32 registerSignal(sp_int32 sig, VCallback<EventLoop::Status> cb);
virtual sp_int32 unRegisterSignal(sp_int32 sig);
virtual sp_int32 registerForRead(sp_int32 fd, VCallback<EventLoop::Status> cb, bool persistent,
sp_int64 timeoutMicroSecs);
virtual sp_int32 registerForRead(sp_int32 fd, VCallback<EventLoop::Status> cb, bool persistent);
Expand All @@ -63,6 +65,7 @@ class EventLoopImpl : public EventLoop {
// Static member functions to interact with C libevent API
static void eventLoopImplReadCallback(sp_int32 fd, sp_int16 event, void* arg);
static void eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void* arg);
static void eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg);
static void eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg);

private:
Expand All @@ -81,6 +84,9 @@ class EventLoopImpl : public EventLoop {
// libevent callback on timer events.
void handleTimerCallback(sp_int16 event, sp_int64 timerid);

// libevent callback on signal events.
void handleSignalCallback(sp_int32 sig, sp_int16 event);

// The underlying dispatcher that we wrap around.
struct event_base* mDispatcher;

Expand All @@ -93,6 +99,9 @@ class EventLoopImpl : public EventLoop {
// The registered timers.
std::unordered_map<sp_int64, SS_RegisteredEvent<sp_int64>*> mTimerEvents;

// The registered signals.
std::unordered_map<sp_int32, SS_RegisteredEvent<sp_int32>*> mSignalEvents;

// The registered instant callbacks
typedef std::list<VCallback<>> OrderedCallbackList;
OrderedCallbackList mListInstantCallbacks;
Expand Down
9 changes: 7 additions & 2 deletions heron/stmgr/src/cpp/server/stmgr-main.cpp
Expand Up @@ -48,6 +48,12 @@ DEFINE_string(ckptmgr_id, "", "The id of the local ckptmgr");
DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr");
DEFINE_string(metricscachemgr_mode, "disabled", "MetricsCacheMgr mode, default `disabled`");

EventLoopImpl ss;

void sigtermHandler(int signum) {
ss.loopExit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit concerned that if loopExit() is safe to call here.

  • which thread it is running on and if there might be racing condition.
  • if it is possible that loopExit() might get stuck.

Copy link
Author

@glrf glrf Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sigtermHandler should run the in the same thread as any other callback function and loopExit() directly calls event_base_loopbreak() which I think should be safe to call. Here in the libevent book they call event_base_loopbreak() in a very similar way.

But maybe someone with more experience with libevent could give some input on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nwangtw - stmgr runs in a single thread. hence loopExit is fine in the context of sigterm handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nwangtw - stmgr runs in a single thread. hence loopExit is fine in the context of sigterm handler.

}

int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);

Expand All @@ -56,8 +62,6 @@ int main(int argc, char* argv[]) {
}
std::vector<std::string> instances = StrUtils::split(FLAGS_instance_ids, ",");

EventLoopImpl ss;

// Read heron internals config from local file
// Create the heron-internals-config-reader to read the heron internals config
heron::config::HeronInternalsConfigReader::Create(&ss,
Expand Down Expand Up @@ -85,6 +89,7 @@ int main(int argc, char* argv[]) {
FLAGS_shell_port, FLAGS_ckptmgr_port, FLAGS_ckptmgr_id,
high_watermark, low_watermark, FLAGS_metricscachemgr_mode);
mgr.Init();
ss.registerSignal(SIGTERM, &sigtermHandler);
ss.loop();
return 0;
}