Skip to content

Commit

Permalink
WIP - fix(ngcc): support recovering when a worker process crashes
Browse files Browse the repository at this point in the history
TODO

Examples of ngcc being able to recover after a worker process crashed:
- While idling: https://circleci.com/gh/angular/angular/682197
- While compiling: https://circleci.com/gh/angular/angular/682209
- While writing files: https://circleci.com/gh/angular/angular/682267

Jira issue: [FW-2008](https://angular-team.atlassian.net/browse/FW-2008)

Fixes angular#36278
  • Loading branch information
gkalpak committed Apr 20, 2020
1 parent 4341743 commit 17b4fac
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 77 deletions.
7 changes: 5 additions & 2 deletions packages/compiler-cli/ngcc/src/execution/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {FileToWrite} from '../rendering/utils';
import {Task, TaskCompletedCallback, TaskQueue} from './tasks/api';

/**
Expand All @@ -16,10 +17,12 @@ import {Task, TaskCompletedCallback, TaskQueue} from './tasks/api';
export type AnalyzeEntryPointsFn = () => TaskQueue;

/** The type of the function that can process/compile a task. */
export type CompileFn = (task: Task) => void;
export type CompileFn<T> = (task: Task) => void|T;

/** The type of the function that creates the `CompileFn` function used to process tasks. */
export type CreateCompileFn = (onTaskCompleted: TaskCompletedCallback) => CompileFn;
export type CreateCompileFn = <T extends void|Promise<void>>(
beforeWritingFiles: (transformedFiles: FileToWrite[]) => T,
onTaskCompleted: TaskCompletedCallback) => CompileFn<T>;

/**
* A class that orchestrates and executes the required work (i.e. analyzes the entry-points,
Expand Down
9 changes: 8 additions & 1 deletion packages/compiler-cli/ngcc/src/execution/cluster/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ export interface TaskCompletedMessage extends JsonObject {
message: string|null;
}

/** A message listing the paths to transformed files about to be written to disk. */
export interface TransformedFilesMessage extends JsonObject {
type: 'transformed-files';
files: AbsoluteFsPath[];
}

/** A message requesting the update of a `package.json` file. */
export interface UpdatePackageJsonMessage extends JsonObject {
type: 'update-package-json';
Expand All @@ -44,7 +50,8 @@ export interface UpdatePackageJsonMessage extends JsonObject {
}

/** The type of messages sent from cluster workers to the cluster master. */
export type MessageFromWorker = ErrorMessage|TaskCompletedMessage|UpdatePackageJsonMessage;
export type MessageFromWorker =
ErrorMessage|TaskCompletedMessage|TransformedFilesMessage|UpdatePackageJsonMessage;

/** The type of messages sent from the cluster master to cluster workers. */
export type MessageToWorker = ProcessTaskMessage;
8 changes: 5 additions & 3 deletions packages/compiler-cli/ngcc/src/execution/cluster/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import {FileSystem} from '../../../../src/ngtsc/file_system';
import {AsyncLocker} from '../../locking/async_locker';
import {Logger} from '../../logging/logger';
import {FileWriter} from '../../writing/file_writer';
import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from '../api';
import {CreateTaskCompletedCallback} from '../tasks/api';
Expand All @@ -21,7 +22,8 @@ import {ClusterMaster} from './master';
export class ClusterExecutor implements Executor {
constructor(
private workerCount: number, private fileSystem: FileSystem, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, private lockFile: AsyncLocker,
private fileWriter: FileWriter, private pkgJsonUpdater: PackageJsonUpdater,
private lockFile: AsyncLocker,
private createTaskCompletedCallback: CreateTaskCompletedCallback) {}

async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, _createCompileFn: CreateCompileFn):
Expand All @@ -30,8 +32,8 @@ export class ClusterExecutor implements Executor {
this.logger.debug(
`Running ngcc on ${this.constructor.name} (using ${this.workerCount} worker processes).`);
const master = new ClusterMaster(
this.workerCount, this.fileSystem, this.logger, this.pkgJsonUpdater, analyzeEntryPoints,
this.createTaskCompletedCallback);
this.workerCount, this.fileSystem, this.logger, this.fileWriter, this.pkgJsonUpdater,
analyzeEntryPoints, this.createTaskCompletedCallback);
return master.run();
});
}
Expand Down
100 changes: 77 additions & 23 deletions packages/compiler-cli/ngcc/src/execution/cluster/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

import * as cluster from 'cluster';

import {FileSystem} from '../../../../src/ngtsc/file_system';
import {AbsoluteFsPath, FileSystem} from '../../../../src/ngtsc/file_system';
import {Logger} from '../../logging/logger';
import {FileWriter} from '../../writing/file_writer';
import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn} from '../api';
import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api';
import {stringifyTask} from '../tasks/utils';

import {MessageFromWorker, TaskCompletedMessage, UpdatePackageJsonMessage} from './api';
import {MessageFromWorker, TaskCompletedMessage, TransformedFilesMessage, UpdatePackageJsonMessage} from './api';
import {Deferred, sendMessageToWorker} from './utils';


Expand All @@ -28,13 +29,15 @@ import {Deferred, sendMessageToWorker} from './utils';
export class ClusterMaster {
private finishedDeferred = new Deferred<void>();
private processingStartTime: number = -1;
private taskAssignments = new Map<number, Task|null>();
private taskAssignments = new Map<number, {task: Task, files?: AbsoluteFsPath[]}|null>();
private taskQueue: TaskQueue;
private onTaskCompleted: TaskCompletedCallback;
private remainingRespawnAttempts = 3;

constructor(
private maxWorkerCount: number, private fileSystem: FileSystem, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, analyzeEntryPoints: AnalyzeEntryPointsFn,
private fileWriter: FileWriter, private pkgJsonUpdater: PackageJsonUpdater,
analyzeEntryPoints: AnalyzeEntryPointsFn,
createTaskCompletedCallback: CreateTaskCompletedCallback) {
if (!cluster.isMaster) {
throw new Error('Tried to instantiate `ClusterMaster` on a worker process.');
Expand Down Expand Up @@ -101,7 +104,7 @@ export class ClusterMaster {
}

// Process the next task on the worker.
this.taskAssignments.set(workerId, task);
this.taskAssignments.set(workerId, {task});
sendMessageToWorker(workerId, {type: 'process-task', task});

isWorkerAvailable = false;
Expand Down Expand Up @@ -145,24 +148,50 @@ export class ClusterMaster {
if (worker.exitedAfterDisconnect) return;

// The worker exited unexpectedly: Determine it's status and take an appropriate action.
const currentTask = this.taskAssignments.get(worker.id);
const assignment = this.taskAssignments.get(worker.id);
this.taskAssignments.delete(worker.id);

this.logger.warn(
`Worker #${worker.id} exited unexpectedly (code: ${code} | signal: ${signal}).\n` +
` Current assignment: ${(currentTask == null) ? '-' : stringifyTask(currentTask)}`);

if (currentTask == null) {
` Current task: ${(assignment == null) ? '-' : stringifyTask(assignment.task)}\n` +
` Current phase: ${
(assignment == null) ? '-' :
(assignment.files == null) ? 'compiling' : 'writing files'}`);
if (assignment == null) {
// The crashed worker process was not in the middle of a task:
// Just spawn another process.
this.logger.debug(`Spawning another worker process to replace #${worker.id}...`);
this.taskAssignments.delete(worker.id);
cluster.fork();
} else {
if (assignment.files != null) {
// The crashed worker process was in the middle of writing transformed files:
// Revert any changes before re-processing the task.
this.logger.debug(`Reverting ${assignment.files.length} transformed files...`);
for (const file of assignment.files) {
this.fileWriter.revertFileAndBackup(file);
}
}

// The crashed worker process was in the middle of a task:
// Impossible to know whether we can recover (without ending up with a corrupted entry-point).
throw new Error(
'Process unexpectedly crashed, while processing format property ' +
`${currentTask.formatProperty} for entry-point '${currentTask.entryPoint.path}'.`);
// Re-add the task back to the queue.
this.taskQueue.markTaskUnprocessed(assignment.task);

// The crashing might be a result of increased memory consumption by ngcc.
// Do not spawn another process, unless this was the last worker process.
const spawnedWorkerCount = Object.keys(cluster.workers).length;
if (spawnedWorkerCount > 0) {
this.logger.debug(`Not spawning another worker process to replace #${
worker.id}. Continuing with ${spawnedWorkerCount} workers...`);
this.maybeDistributeWork();
} else if (this.remainingRespawnAttempts > 0) {
this.logger.debug(`Spawning another worker process to replace #${worker.id}...`);
this.remainingRespawnAttempts--;
cluster.fork();
} else {
throw new Error(
'All worker processes crashed and attempts to re-spawn them failed. ' +
'Please check your system and ensure there is enough memory available.');
}
}
}

Expand All @@ -180,6 +209,8 @@ export class ClusterMaster {
throw new Error(`Error on worker #${workerId}: ${msg.error}`);
case 'task-completed':
return this.onWorkerTaskCompleted(workerId, msg);
case 'transformed-files':
return this.onWorkerTransformedFiles(workerId, msg);
case 'update-package-json':
return this.onWorkerUpdatePackageJson(workerId, msg);
default:
Expand All @@ -205,33 +236,56 @@ export class ClusterMaster {

/** Handle a worker's having completed their assigned task. */
private onWorkerTaskCompleted(workerId: number, msg: TaskCompletedMessage): void {
const task = this.taskAssignments.get(workerId) || null;
const assignment = this.taskAssignments.get(workerId) || null;

if (task === null) {
if (assignment === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}

this.onTaskCompleted(task, msg.outcome, msg.message);
this.onTaskCompleted(assignment.task, msg.outcome, msg.message);

this.taskQueue.markTaskCompleted(task);
this.taskQueue.markTaskCompleted(assignment.task);
this.taskAssignments.set(workerId, null);
this.maybeDistributeWork();
}

/** Handle a worker's message regarding the files transformed while processing its task. */
private onWorkerTransformedFiles(workerId: number, msg: TransformedFilesMessage): void {
const assignment = this.taskAssignments.get(workerId) || null;

if (assignment === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}

const oldFiles = assignment.files;
const newFiles = msg.files;

if (oldFiles !== undefined) {
throw new Error(
`Worker #${workerId} reported transformed files more than once.\n` +
` Old files (${oldFiles.length}): [${oldFiles.join(', ')}]\n` +
` New files (${newFiles.length}): [${newFiles.join(', ')}]\n`);
}

assignment.files = newFiles;
}

/** Handle a worker's request to update a `package.json` file. */
private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void {
const task = this.taskAssignments.get(workerId) || null;
const assignment = this.taskAssignments.get(workerId) || null;

if (task === null) {
if (assignment === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}

const expectedPackageJsonPath = this.fileSystem.resolve(task.entryPoint.path, 'package.json');
const parsedPackageJson = task.entryPoint.packageJson;
const entryPoint = assignment.task.entryPoint;
const expectedPackageJsonPath = this.fileSystem.resolve(entryPoint.path, 'package.json');

if (expectedPackageJsonPath !== msg.packageJsonPath) {
throw new Error(
Expand All @@ -247,7 +301,7 @@ export class ClusterMaster {
// In other words, task processing should only rely on the info that was there when the
// file was initially parsed (during entry-point analysis) and not on the info that might
// be added later (during task processing).
this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, parsedPackageJson);
this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, entryPoint.packageJson);
}

/** Stop all workers and stop listening on cluster events. */
Expand Down
10 changes: 6 additions & 4 deletions packages/compiler-cli/ngcc/src/execution/cluster/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export class Deferred<T> {
* (This function should be invoked from cluster workers only.)
*
* @param msg The message to send to the cluster master.
* @return A promise that is resolved once the message has been sent.
*/
export const sendMessageToMaster = (msg: MessageFromWorker): void => {
export const sendMessageToMaster = (msg: MessageFromWorker): Promise<void> => {
if (cluster.isMaster) {
throw new Error('Unable to send message to the master process: Already on the master process.');
}
Expand All @@ -55,7 +56,7 @@ export const sendMessageToMaster = (msg: MessageFromWorker): void => {
throw new Error('Unable to send message to the master process: Missing `process.send()`.');
}

process.send(msg);
return new Promise(resolve => process.send!(msg, resolve));
};

/**
Expand All @@ -64,8 +65,9 @@ export const sendMessageToMaster = (msg: MessageFromWorker): void => {
*
* @param workerId The ID of the recipient worker.
* @param msg The message to send to the worker.
* @return A promise that is resolved once the message has been sent.
*/
export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): void => {
export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): Promise<void> => {
if (!cluster.isMaster) {
throw new Error('Unable to send message to worker process: Sender is not the master process.');
}
Expand All @@ -77,5 +79,5 @@ export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): voi
'Unable to send message to worker process: Recipient does not exist or has disconnected.');
}

worker.send(msg);
return new Promise(resolve => worker.send(msg, resolve));
};
29 changes: 22 additions & 7 deletions packages/compiler-cli/ngcc/src/execution/cluster/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,44 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile
}

const compile = createCompileFn(
transformedFiles => sendMessageToMaster({
type: 'transformed-files',
files: transformedFiles.map(f => f.path),
}),
(_task, outcome, message) => sendMessageToMaster({type: 'task-completed', outcome, message}));


// Listen for `ProcessTaskMessage`s and process tasks.
cluster.worker.on('message', (msg: MessageToWorker) => {
cluster.worker.on('message', async (msg: MessageToWorker) => {
try {
switch (msg.type) {
case 'process-task':
logger.debug(
`[Worker #${cluster.worker.id}] Processing task: ${stringifyTask(msg.task)}`);
return compile(msg.task);
return await compile(msg.task);
default:
throw new Error(
`[Worker #${cluster.worker.id}] Invalid message received: ${JSON.stringify(msg)}`);
}
} catch (err) {
sendMessageToMaster({
type: 'error',
error: (err instanceof Error) ? (err.stack || err.message) : err,
});
switch (err && err.code) {
case 'ENOMEM':
// Not being able to allocate enough memory is not necessarily a problem with processing
// the current task. It could just mean that there are too many tasks being processed
// simultaneously.
//
// Exit with an error and let the cluster master decide how to handle this.
logger.warn(`[Worker #${cluster.worker.id}]`, err);
process.exit(1);
default:
await sendMessageToMaster({
type: 'error',
error: (err instanceof Error) ? (err.stack || err.message) : err,
});
}
}
});

// Return a promise that is never resolved.
return new Promise(() => undefined);
}
}

0 comments on commit 17b4fac

Please sign in to comment.