Skip to content

Commit

Permalink
Handle SIGINT etc. via a sigwait() signal handler thread
Browse files Browse the repository at this point in the history
This allows other threads to install callbacks that run in a regular,
non-signal context. In particular, we can use this to signal the
downloader thread to quit.

Closes #1183.
  • Loading branch information
edolstra committed Jan 17, 2017
1 parent c0d55f9 commit cc3b93c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 27 deletions.
20 changes: 3 additions & 17 deletions src/libmain/shared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
namespace nix {


static void sigintHandler(int signo)
{
_isInterrupted = 1;
}


static bool gcWarning = true;

void printGCWarning()
Expand Down Expand Up @@ -120,19 +114,11 @@ void initNix()
settings.processEnvironment();
settings.loadConfFile();

/* Catch SIGINT. */
struct sigaction act;
act.sa_handler = sigintHandler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
if (sigaction(SIGINT, &act, 0))
throw SysError("installing handler for SIGINT");
if (sigaction(SIGTERM, &act, 0))
throw SysError("installing handler for SIGTERM");
if (sigaction(SIGHUP, &act, 0))
throw SysError("installing handler for SIGHUP");
startSignalHandlerThread();

/* Ignore SIGPIPE. */
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_handler = SIG_IGN;
act.sa_flags = 0;
if (sigaction(SIGPIPE, &act, 0))
Expand Down
20 changes: 15 additions & 5 deletions src/libstore/download.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,21 +323,31 @@ struct CurlDownloader : public Downloader
}

~CurlDownloader()
{
stopWorkerThread();

workerThread.join();

if (curlm) curl_multi_cleanup(curlm);
}

void stopWorkerThread()
{
/* Signal the worker thread to exit. */
{
auto state(state_.lock());
state->quit = true;
}
writeFull(wakeupPipe.writeSide.get(), " ");

workerThread.join();

if (curlm) curl_multi_cleanup(curlm);
writeFull(wakeupPipe.writeSide.get(), " ", false);
}

void workerThreadMain()
{
/* Cause this thread to be notified on SIGINT. */
auto callback = createInterruptCallback([&]() {
stopWorkerThread();
});

std::map<CURL *, std::shared_ptr<DownloadItem>> items;

bool quit = false;
Expand Down
70 changes: 66 additions & 4 deletions src/libutil/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

#include "util.hh"
#include "affinity.hh"
#include "sync.hh"

#include <iostream>
#include <cctype>
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <sstream>
#include <cstring>
#include <cctype>
#include <iostream>
#include <sstream>
#include <thread>

#include <sys/wait.h>
#include <unistd.h>
Expand Down Expand Up @@ -933,7 +935,7 @@ void restoreSIGPIPE()
//////////////////////////////////////////////////////////////////////


volatile sig_atomic_t _isInterrupted = 0;
bool _isInterrupted = false;

thread_local bool interruptThrown = false;

Expand Down Expand Up @@ -1200,4 +1202,64 @@ void callFailure(const std::function<void(std::exception_ptr exc)> & failure, st
}


static Sync<std::list<std::function<void()>>> _interruptCallbacks;

static void signalHandlerThread(sigset_t set)
{
while (true) {
int signal = 0;
sigwait(&set, &signal);

if (signal == SIGINT || signal == SIGTERM || signal == SIGHUP) {
_isInterrupted = 1;

{
auto interruptCallbacks(_interruptCallbacks.lock());
for (auto & callback : *interruptCallbacks) {
try {
callback();
} catch (...) {
ignoreException();
}
}
}
}
}
}

void startSignalHandlerThread()
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
sigaddset(&set, SIGHUP);
if (pthread_sigmask(SIG_BLOCK, &set, nullptr))
throw SysError("blocking signals");

std::thread(signalHandlerThread, set).detach();
}

/* RAII helper to automatically deregister a callback. */
struct InterruptCallbackImpl : InterruptCallback
{
std::list<std::function<void()>>::iterator it;
~InterruptCallbackImpl() override
{
_interruptCallbacks.lock()->erase(it);
}
};

std::unique_ptr<InterruptCallback> createInterruptCallback(std::function<void()> callback)
{
auto interruptCallbacks(_interruptCallbacks.lock());
interruptCallbacks->push_back(callback);

auto res = std::make_unique<InterruptCallbackImpl>();
res->it = interruptCallbacks->end();
res->it--;

return res;
}

}
17 changes: 16 additions & 1 deletion src/libutil/util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ void restoreSIGPIPE();

/* User interruption. */

extern volatile sig_atomic_t _isInterrupted;
extern bool _isInterrupted;

extern thread_local bool interruptThrown;

Expand Down Expand Up @@ -416,4 +416,19 @@ void callSuccess(
}


/* Start a thread that handles various signals. Also block those signals
on the current thread (and thus any threads created by it). */
void startSignalHandlerThread();

struct InterruptCallback
{
virtual ~InterruptCallback() { };
};

/* Register a function that gets called on SIGINT (in a non-signal
context). */
std::unique_ptr<InterruptCallback> createInterruptCallback(
std::function<void()> callback);


}

0 comments on commit cc3b93c

Please sign in to comment.