Skip to content

Commit

Permalink
Unregister signal handlers on worker shutdown (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Oct 19, 2023
2 parents 0a8714a + 5ec98bd commit 6f78061
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Adds support for `graphile-config` - configuration can now be read from a

Crontab: now supports `jobKey` and `jobKeyMode` opts (thanks @spiffytech!)

Signals: now releases signal handlers when shut down via the API.

### v0.15.1

Fixes issues with graceful worker shutdowns:
Expand Down
3 changes: 3 additions & 0 deletions __tests__/main.runTaskList.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ test("main will execute jobs as they come up, and exits cleanly", () =>
};

// Run the worker
expect(process.listeners("SIGTERM")).toHaveLength(0);
const workerPool = runTaskList({ concurrency: 3 }, tasks, pgPool);
expect(process.listeners("SIGTERM")).toHaveLength(1);
let finished = false;
workerPool.promise.then(() => {
finished = true;
Expand All @@ -69,6 +71,7 @@ test("main will execute jobs as they come up, and exits cleanly", () =>
expect(finished).toBeTruthy();
await workerPool.promise;
expect(await jobCount(pgPool)).toEqual(0);
expect(process.listeners("SIGTERM")).toHaveLength(0);
} finally {
Object.values(jobPromises).forEach((p) => p?.resolve());
}
Expand Down
61 changes: 49 additions & 12 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
Job,
TaskList,
Worker,
WorkerEventMap,
WorkerEvents,
WorkerOptions,
WorkerPool,
Expand Down Expand Up @@ -53,31 +54,49 @@ let _registeredSignalHandlers = false;
let _shuttingDownGracefully = false;
let _shuttingDownForcefully = false;

let _registeredSignalHandlersCount = 0;

/**
* This will register the signal handlers to make sure the worker shuts down
* gracefully if it can. It will only register signal handlers once; even if
* you call it multiple times it will always use the first logger it is passed,
* future calls will register the events but take no further actions.
*/
function registerSignalHandlers(logger: Logger, events: WorkerEvents) {
function registerSignalHandlers(
logger: Logger,
events: WorkerEvents,
): () => void {
if (_shuttingDownGracefully || _shuttingDownForcefully) {
throw new Error(
"System has already gone into shutdown, should not be spawning new workers now!",
);
}

_signalHandlersEventEmitter.on("gracefulShutdown", (o) =>
events.emit("gracefulShutdown", o),
);
_signalHandlersEventEmitter.on("forcefulShutdown", (o) =>
events.emit("forcefulShutdown", o),
);
const gscb = (o: WorkerEventMap["gracefulShutdown"]) =>
events.emit("gracefulShutdown", o);
const fscb = (o: WorkerEventMap["forcefulShutdown"]) =>
events.emit("forcefulShutdown", o);

if (_registeredSignalHandlers) {
return;
} else {
_registeredSignalHandlers = true;
if (!_registeredSignalHandlers) {
_reallyRegisterSignalHandlers(logger);
}

_registeredSignalHandlersCount++;
_signalHandlersEventEmitter.on("gracefulShutdown", gscb);
_signalHandlersEventEmitter.on("forcefulShutdown", fscb);
return function release() {
_signalHandlersEventEmitter.off("gracefulShutdown", gscb);
_signalHandlersEventEmitter.off("forcefulShutdown", fscb);
_registeredSignalHandlersCount--;
if (_registeredSignalHandlersCount === 0) {
_releaseSignalHandlers();
}
};
}

let _releaseSignalHandlers = () => void 0;

function _reallyRegisterSignalHandlers(logger: Logger) {
const switchToForcefulHandler = () => {
logger.debug(
`Switching to forceful handler for termination signals (${SIGNALS.join(
Expand Down Expand Up @@ -167,9 +186,23 @@ function registerSignalHandlers(logger: Logger, events: WorkerEvents) {
`Registering termination signal handlers (${SIGNALS.join(", ")})`,
{ registeringSignalHandlers: SIGNALS },
);

_registeredSignalHandlers = true;
for (const signal of SIGNALS) {
process.on(signal, gracefulHandler);
}
_releaseSignalHandlers = () => {
if (_shuttingDownGracefully || _shuttingDownForcefully) {
console.warn(`Not unregistering signal handlers as we're shutting down`);
return;
}

_releaseSignalHandlers = () => void 0;
for (const signal of SIGNALS) {
process.off(signal, gracefulHandler);
}
_registeredSignalHandlers = false;
};
}

export function runTaskList(
Expand All @@ -183,9 +216,10 @@ export function runTaskList(
}
const { concurrency = defaults.concurrentJobs, noHandleSignals } = options;

let unregisterSignalHandlers: (() => void) | undefined = undefined;
if (!noHandleSignals) {
// Clean up when certain signals occur
registerSignalHandlers(logger, events);
unregisterSignalHandlers = registerSignalHandlers(logger, events);
}

const promise = deferred();
Expand Down Expand Up @@ -299,6 +333,9 @@ export function runTaskList(
const idx = allWorkerPools.indexOf(workerPool);
allWorkerPools.splice(idx, 1);
promise.resolve(resetLockedAtPromise);
if (unregisterSignalHandlers) {
unregisterSignalHandlers();
}
} else {
logger.error(
`Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`,
Expand Down

0 comments on commit 6f78061

Please sign in to comment.