Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve synchronization between Node.js parent process and worker thread #4603

Merged
merged 11 commits into from Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -133,6 +133,5 @@ void test() throws Exception {
new String(Files.readAllBytes(Paths.get("target/differences")), StandardCharsets.UTF_8)
)
.isEmpty();
// assertPerfMonitoringAvailable(perfMonitoringDir);
}
}
61 changes: 36 additions & 25 deletions packages/bridge/src/server.ts
Expand Up @@ -84,7 +84,13 @@ export function start(
port = 0,
host = '127.0.0.1',
timeout = SHUTDOWN_TIMEOUT,
): Promise<http.Server> {
): Promise<{ server: http.Server; serverClosed: Promise<void>; worker: Worker }> {
const pendingCloseRequests: express.Response[] = [];
let resolveClosed: () => void;
const serverClosed: Promise<void> = new Promise(resolve => {
resolveClosed = resolve;
});

logMemoryConfiguration();
if (getContext().debugMemory) {
registerGarbageCollectionObserver();
Expand All @@ -103,23 +109,12 @@ export function start(

worker.on('exit', code => {
debug(`The worker thread exited with code ${code}`);
closeServer();
});

worker.on('error', err => {
debug(`The worker thread failed: ${err}`);

logMemoryError(err);

/**
* At this point, the worker thread can no longer respond to any request from the plugin.
* However, existing requests are stalled until they time out. Since the bridge server is
* about to be shut down in an unexpected manner anyway, we can close all connections and
* avoid waiting unnecessarily for them to eventually close.
*/
server.closeAllConnections();

debug('Shutting down the bridge server due to failure');
shutdown();
});

const app = express();
Expand All @@ -130,9 +125,7 @@ export function start(
* in case the process becomes orphan.
*/
const orphanTimeout = timeoutMiddleware(() => {
if (server.listening) {
shutdown();
}
closeWorker();
}, timeout);

/**
Expand All @@ -145,15 +138,14 @@ export function start(
app.use(errorMiddleware);

app.post('/close', (_: express.Request, response: express.Response) => {
debug('Shutting down the bridge server');
response.end(() => {
shutdown();
});
pendingCloseRequests.push(response);
closeWorker();
});

server.on('close', () => {
debug('The bridge server shut down');
orphanTimeout.stop();
orphanTimeout.cancel();
resolveClosed();
});

server.on('error', err => {
Expand All @@ -166,17 +158,36 @@ export function start(
* which we get using server.address().
*/
debug(`The bridge server is listening on port ${(server.address() as AddressInfo)?.port}`);
resolve(server);
resolve({ server, serverClosed, worker });
});

server.listen(port, host);

/**
* Shutdown the server and the worker thread
*/
function shutdown() {
worker.terminate().catch(reason => debug(`Failed to terminate the worker thread: ${reason}`));
server.close();
function closeWorker() {
debug('Shutting down the worker');
worker.postMessage({ type: 'close' });
}

/**
* Shutdown the server and the worker thread
*/
function closeServer() {
if (server.listening) {
while (pendingCloseRequests.length) {
pendingCloseRequests.pop()?.end();
}
/**
* At this point, the worker thread can no longer respond to any request from the plugin.
* If we reached this due to worker failure, existing requests are stalled until they time out.
* Since the bridge server is about to be shut down in an unexpected manner anyway, we can
* close all connections and avoid waiting unnecessarily for them to eventually close.
*/
server.closeAllConnections();
server.close();
}
}
});
}
14 changes: 9 additions & 5 deletions packages/bridge/src/timeout/middleware.ts
Expand Up @@ -29,17 +29,21 @@ import Timeout from './timeout';
export function timeoutMiddleware(f: () => void, delay: number) {
const timeout = new Timeout(f, delay);
timeout.start();
let cancelled = false;

return {
middleware(_request: express.Request, response: express.Response, next: express.NextFunction) {
timeout.stop();
if (!cancelled) {
timeout.stop();

response.on('finish', function () {
timeout.start();
});
response.on('finish', function () {
timeout.start();
});
}
next();
},
stop() {
cancel() {
cancelled = true;
timeout.stop();
},
};
Expand Down
3 changes: 3 additions & 0 deletions packages/bridge/src/worker.js
Expand Up @@ -69,6 +69,9 @@ if (parentPort) {
try {
const { type, data } = message;
switch (type) {
case 'close':
parentThread.close();
break;
case 'on-analyze-css': {
await readFileLazily(data);

Expand Down
12 changes: 7 additions & 5 deletions packages/bridge/tests/router.test.ts
Expand Up @@ -22,7 +22,6 @@ import http from 'http';
import { createAndSaveProgram, ProjectAnalysisInput, RuleConfig } from '@sonar/jsts';
import path from 'path';
import { start } from '../src/server';
import { promisify } from 'util';
import { request } from './tools';
import * as fs from 'fs';

Expand All @@ -31,9 +30,9 @@ import { rule as S5362 } from '../../css/src/rules/S5362';
describe('router', () => {
const fixtures = path.join(__dirname, 'fixtures', 'router');
const port = 0;
let closePromise: Promise<void>;

let server: http.Server;
let close: () => Promise<void>;

beforeEach(async () => {
setContext({
Expand All @@ -43,12 +42,15 @@ describe('router', () => {
bundles: [],
});
jest.setTimeout(60 * 1000);
server = await start(port, '127.0.0.1', 60 * 60 * 1000);
close = promisify(server.close.bind(server));
const { server: serverInstance, serverClosed } = await start(port, '127.0.0.1', 60 * 60 * 1000);
server = serverInstance;
closePromise = serverClosed;
});

afterEach(async () => {
await close();
await request(server, '/close', 'POST');
//We need to await the server close promise, as the http server still needs to be up to finish the response of the /close request.
await closePromise;
});

it('should route /analyze-project requests', async () => {
Expand Down
52 changes: 32 additions & 20 deletions packages/bridge/tests/server.test.ts
Expand Up @@ -18,7 +18,6 @@
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
import { start } from '../src/server';
import { promisify } from 'util';
import path from 'path';
import { setContext } from '@sonar/shared';
import { AddressInfo } from 'net';
Expand All @@ -42,8 +41,7 @@ describe('server', () => {

console.log = jest.fn();

const server = await start(undefined, undefined);
const close = promisify(server.close.bind(server));
const { server, serverClosed } = await start(undefined, undefined);

expect(server.listening).toBeTruthy();
expect(console.log).toHaveBeenCalledTimes(3);
Expand All @@ -57,14 +55,14 @@ describe('server', () => {
`DEBUG The bridge server is listening on port ${(server.address() as AddressInfo)?.port}`,
);

await close();
await request(server, '/close', 'POST');
await serverClosed;
});

it('should fail when linter is not initialized', async () => {
expect.assertions(3);

const server = await start(port);
const close = promisify(server.close.bind(server));
const { server, serverClosed } = await start(port);

const ruleId = 'no-extra-semi';
const fileType = 'MAIN';
Expand All @@ -86,15 +84,14 @@ describe('server', () => {
ruleId,
}),
);

await close();
await request(server, '/close', 'POST');
await serverClosed;
});

it('should route service requests', async () => {
expect.assertions(2);

const server = await start(port);
const close = promisify(server.close.bind(server));
const { server, serverClosed } = await start(port);

expect(server.listening).toBeTruthy();

Expand All @@ -113,27 +110,43 @@ describe('server', () => {
}),
);

await close();
await request(server, '/close', 'POST');
await serverClosed;
});

it('should shut down', async () => {
expect.assertions(2);
expect.assertions(3);

console.log = jest.fn();

const server = await start(port);
const { server, serverClosed } = await start(port);
expect(server.listening).toBeTruthy();

const closeRequest = request(server, '/close', 'POST');
await closeRequest;
await request(server, '/close', 'POST');

expect(server.listening).toBeFalsy();
expect(console.log).toHaveBeenCalledWith('DEBUG Shutting down the bridge server');
expect(console.log).toHaveBeenCalledWith('DEBUG Shutting down the worker');
await serverClosed;
});

it('worker crashing should close server', async () => {
console.log = jest.fn();

const { server, serverClosed, worker } = await start(port);
expect(server.listening).toBeTruthy();

worker.emit('error', new Error('An error'));
await worker.terminate();

expect(server.listening).toBeFalsy();
expect(console.log).toHaveBeenCalledWith('DEBUG The worker thread failed: Error: An error');
await serverClosed;
});

it('should timeout', async () => {
console.log = jest.fn();

const server = await start(port, '127.0.0.1', 500);
const { server, serverClosed } = await start(port, '127.0.0.1', 500);

await new Promise(r => setTimeout(r, 100));
expect(server.listening).toBeTruthy();
Expand All @@ -143,10 +156,9 @@ describe('server', () => {
expect(server.listening).toBeTruthy();
await request(server, '/status', 'GET');

await new Promise(r => setTimeout(r, 600));
expect(server.listening).toBeFalsy();

await serverClosed;
vdiez marked this conversation as resolved.
Show resolved Hide resolved
expect(console.log).toHaveBeenCalledWith('DEBUG The bridge server shut down');
expect(server.listening).toBeFalsy();
});
});

Expand Down
28 changes: 5 additions & 23 deletions packages/bridge/tests/tools/request.ts
Expand Up @@ -23,30 +23,12 @@ import http from 'http';
/**
* Sends an HTTP request to a server's endpoint running on localhost.
*/
export function request(server: http.Server, path: string, method: string, data: any = {}) {
const options = {
host: '127.0.0.1',
path,
method,
port: (server.address() as AddressInfo).port,
export async function request(server: http.Server, path: string, method: string, body: any = {}) {
return await fetch(`http://127.0.0.1:${(server.address() as AddressInfo).port}${path}`, {
headers: {
'Content-Type': 'application/json',
},
timeout: 10000,
};

return new Promise((resolve, reject) => {
const request = http.request(options, res => {
let response = '';
res.on('data', chunk => {
response += chunk;
});

res.on('end', () => resolve(response));
});
request.on('error', reject);

request.write(JSON.stringify(data));
request.end();
});
method,
body: method !== 'GET' ? JSON.stringify(body) : undefined,
}).then(response => response.text());
}
2 changes: 1 addition & 1 deletion packages/tsconfig.json
Expand Up @@ -3,7 +3,7 @@
"target": "ES2021",
"module": "nodenext",
"moduleResolution": "nodenext",
"lib": ["ES2021"],
"lib": ["ES2021", "dom"],
vdiez marked this conversation as resolved.
Show resolved Hide resolved
"declaration": true,
"outDir": "../lib",
"strict": true,
Expand Down