Skip to content

Commit

Permalink
DEBUG DO NOT MERGE
Browse files Browse the repository at this point in the history
  • Loading branch information
dubzzz committed Feb 17, 2024
1 parent 98094f7 commit 76a2b5f
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 3 deletions.
45 changes: 44 additions & 1 deletion packages/worker/src/internals/worker-pool/BasicPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
WorkerToPoolMessage,
PoolToWorkerMessage,
} from './IWorkerPool.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

/**
* Worker internal API
Expand Down Expand Up @@ -37,6 +39,7 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
onSuccess: OnSuccessCallback<TSuccess>;
onFailure: OnErrorCallback;
} | null = null;
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> NEW WORKER\n`, { flag: 'a' });
const worker = new Worker(this.workerFileUrl, { workerData: { fastcheckWorker: true } });

let resolveOnline: () => void = () => undefined;
Expand All @@ -47,13 +50,19 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
});

worker.on('online', () => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER is ONLINE\n`, {
flag: 'a',
});
// Emitted when the worker thread has started executing JavaScript code.
// More details at https://nodejs.org/api/worker_threads.html#event-online
ready = true;
resolveOnline();
});

worker.on('message', (data: WorkerToPoolMessage<TSuccess>): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER sent MESSAGE\n`, {
flag: 'a',
});
// Emitted for any incoming message, containing the cloned input of port.postMessage().
// More details at https://nodejs.org/api/worker_threads.html#event-message
if (registration === null || data.runId !== registration.currentRunId) {
Expand All @@ -68,6 +77,9 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
});

worker.on('messageerror', (err: Error): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER sent MESSAGE ERROR\n`, {
flag: 'a',
});
// Emitted when deserializing a message failed.
// More details at https://nodejs.org/api/worker_threads.html#event-messageerror
if (!ready) {
Expand All @@ -81,6 +93,9 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
});

worker.on('error', (err: Error): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER error -> ${err}\n`, {
flag: 'a',
});
// Emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated.
// More details at https://nodejs.org/api/worker_threads.html#event-error
faulty = true;
Expand All @@ -93,7 +108,24 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
registration = null;
});

worker.on('uncaughtException', (err): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] uncaughtException -> ${err}\n`, {
flag: 'a',
});
});

worker.on('unhandledRejection', (reason): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] unhandledRejection -> ${reason}\n`, {
flag: 'a',
});
});

worker.on('exit', (code: number): void => {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] BasicPool -> WORKER exited (code=${code})\n`,
{ flag: 'a' },
);
// Emitted once the worker has stopped. If the worker exited by calling process.exit(), the exitCode parameter is the passed exit code. If the worker was terminated, the exitCode parameter is 1.
// More details at https://nodejs.org/api/worker_threads.html#event-exit
faulty = true;
Expand Down Expand Up @@ -149,6 +181,17 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay

public terminateAllWorkers(): Promise<void> {
const dropped = this.workers.splice(0, this.workers.length); // clear all workers
return Promise.all(dropped.map((w) => w.worker.terminate())).then(() => undefined);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] BasicPool -> TERMINATING ${dropped.length} workers\n`,
{
flag: 'a',
},
);
const p = Promise.all(dropped.map((w) => w.worker.terminate())).then(() => undefined);
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> TERMINATION completed\n`, {
flag: 'a',
});
return p;
}
}
13 changes: 13 additions & 0 deletions packages/worker/src/internals/worker-pool/GlobalPool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { writeFileSync } from 'fs';
import * as process from 'process';
import { BasicPool } from './BasicPool.js';
import type { IWorkerPool, PooledWorker } from './IWorkerPool.js';

Expand Down Expand Up @@ -37,6 +39,7 @@ export class GlobalPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPa
}

spawnNewWorker(): Promise<PooledWorker<TSuccess, TPayload>> {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] GlobalPool::spawnNewWorker\n`, { flag: 'a' });
cancelPendingTerminationIfAny(this.workerFileUrl);
return this.internalPool.spawnNewWorker();
}
Expand All @@ -47,10 +50,20 @@ export class GlobalPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPa
}

terminateAllWorkers(): Promise<void> {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] GlobalPool::terminateAllWorkers -> START\n`, {
flag: 'a',
});
cancelPendingTerminationIfAny(this.workerFileUrl);
pendingTerminationPerFile.set(
this.workerFileUrl.toString(),
setTimeout(() => {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] GlobalPool::terminateAllWorkers -> In timer\n`,
{
flag: 'a',
},
);
this.internalPool.terminateAllWorkers();
}, 0),
);
Expand Down
12 changes: 11 additions & 1 deletion packages/worker/src/internals/worker-pool/OneTimePool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { writeFileSync } from 'fs';
import * as process from 'process';
import { BasicPool } from './BasicPool.js';
import type { IWorkerPool, PooledWorker } from './IWorkerPool.js';

Expand All @@ -17,6 +19,7 @@ export class OneTimePool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TP
}

spawnNewWorker(): Promise<PooledWorker<TSuccess, TPayload>> {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] OneTimePool::spawnNewWorker\n`, { flag: 'a' });
return this.internalPool.spawnNewWorker();
}

Expand All @@ -25,6 +28,13 @@ export class OneTimePool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TP
}

terminateAllWorkers(): Promise<void> {
return this.internalPool.terminateAllWorkers();
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] OneTimePool::terminateAllWorkers -> START\n`, {
flag: 'a',
});
const p = this.internalPool.terminateAllWorkers();
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] OneTimePool::terminateAllWorkers -> END\n`, {
flag: 'a',
});
return p;
}
}
24 changes: 24 additions & 0 deletions packages/worker/src/internals/worker-runner/NoWorkerRunner.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
import type { MessagePort } from 'node:worker_threads';
import type { MainThreadToWorkerMessage, WorkerToMainThreadMessage } from '../SharedTypes.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

/**
* Setup the fallback worker listening to all predicates and rejecting any that has never been registered
* @param parentPort - the parent to listen to and sending us queries to execute
* @param registeredPredicates - list of all the predicates currently registered, can be updated after the call to runNoWorker
*/
export function runNoWorker(parentPort: MessagePort, registeredPredicates: Set<number>): void {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runNoWorker -> SETUP\n`, { flag: 'a' });
parentPort.on('message', (message: MainThreadToWorkerMessage<unknown>) => {
try {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] runNoWorker -> MESSAGE RECEIVED ${JSON.stringify(message)}\n`,
{
flag: 'a',
},
);
} catch (err) {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] runNoWorker -> MESSAGE RECEIVED {{{!!!}}}\n`,
{ flag: 'a' },
);
}
const { targetPredicateId, runId } = message;
if (registeredPredicates.has(targetPredicateId)) {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runNoWorker -> REGISTRATION CONFIRMED\n`, {
flag: 'a',
});
return;
}
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runNoWorker -> REGISTRATION NOT-FOUND\n`, {
flag: 'a',
});
const errorMessage = `Unregistered predicate, got: ${targetPredicateId}, for registered: ${[
...registeredPredicates,
].join(', ')}`;
Expand Down
22 changes: 22 additions & 0 deletions packages/worker/src/internals/worker-runner/WorkerRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { MessagePort } from 'node:worker_threads';
import type { MainThreadToWorkerMessage, PropertyPredicate, WorkerToMainThreadMessage } from '../SharedTypes.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

/**
* Setup a worker listening to parentPort and able to run a single time for a given predicate
Expand All @@ -12,7 +14,21 @@ export function runWorker<Ts extends unknown[]>(
predicateId: number,
predicate: PropertyPredicate<Ts>,
): void {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> SETUP\n`, { flag: 'a' });
parentPort.on('message', (message: MainThreadToWorkerMessage<Ts>) => {
try {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] runWorker -> MESSAGE RECEIVED ${JSON.stringify(message)}\n`,
{
flag: 'a',
},
);
} catch (err) {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> MESSAGE RECEIVED {{{!!!}}}\n`, {
flag: 'a',
});
}
const { payload, targetPredicateId, runId } = message;
if (targetPredicateId !== predicateId) {
// The current predicate is not the one targeted by the received message
Expand All @@ -21,10 +37,16 @@ export function runWorker<Ts extends unknown[]>(
Promise.resolve(predicate(...payload)).then(
(output) => {
const message: WorkerToMainThreadMessage = { success: true, output, runId };
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> PREDICATE OUTPUT\n`, {
flag: 'a',
});
parentPort.postMessage(message);
},
(error) => {
const message: WorkerToMainThreadMessage = { success: false, error, runId };
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> PREDICATE ERROR\n`, {
flag: 'a',
});
parentPort.postMessage(message);
},
);
Expand Down
49 changes: 48 additions & 1 deletion packages/worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { isMainThread, parentPort, workerData } from 'node:worker_threads';
import { isMainThread, parentPort, workerData, threadId } from 'node:worker_threads';

import { assert as fcAssert, type IAsyncProperty, type IProperty, type Parameters } from 'fast-check';
import { runWorker } from './internals/worker-runner/WorkerRunner.js';
import { runMainThread } from './internals/MainThreadRunner.js';
import { NoopWorkerProperty } from './internals/NoopWorkerProperty.js';
import type { PropertyArbitraries, PropertyPredicate, WorkerProperty } from './internals/SharedTypes.js';
import { runNoWorker } from './internals/worker-runner/NoWorkerRunner.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

let lastPredicateId = 0;
const allKnownTerminateAllWorkersPerProperty = new Map<
Expand All @@ -15,8 +17,18 @@ const allKnownTerminateAllWorkersPerProperty = new Map<
async function clearAllWorkersFor(property: IAsyncProperty<unknown> | IProperty<unknown>): Promise<void> {
const terminateAllWorkers = allKnownTerminateAllWorkersPerProperty.get(property);
if (terminateAllWorkers === undefined) {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] No cleaning found for property, found ${allKnownTerminateAllWorkersPerProperty.size} other cleanings\n`,
{ flag: 'a' },
);
return;
}
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] Executing cleaning for property, found ${allKnownTerminateAllWorkersPerProperty.size - 1} other cleanings\n`,
{ flag: 'a' },
);
await terminateAllWorkers();
}

Expand All @@ -38,9 +50,37 @@ export async function assert<Ts>(property: IAsyncProperty<Ts> | IProperty<Ts>, p
if (isMainThread) {
// Main thread code
try {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> BEFORE fc.assert\n`,
{
flag: 'a',
},
);
await fcAssert(property, params);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> AFTER fc.assert\n`,
{
flag: 'a',
},
);
} finally {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> BEFORE cleaning for workers\n`,
{
flag: 'a',
},
);
await clearAllWorkersFor(property);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> AFTER cleaning for workers\n`,
{
flag: 'a',
},
);
}
} else {
// Worker code
Expand Down Expand Up @@ -82,6 +122,13 @@ function workerProperty<Ts extends [unknown, ...unknown[]]>(
const isolationLevel = options.isolationLevel || 'file';
const arbitraries = args.slice(0, -1) as PropertyArbitraries<Ts>;
const { property, terminateAllWorkers } = runMainThread<Ts>(url, currentPredicateId, isolationLevel, arbitraries);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] Registering cleaning for property\n`,
{
flag: 'a',
},
);
allKnownTerminateAllWorkersPerProperty.set(property, terminateAllWorkers);
return property;
} else if (parentPort !== null && workerData.fastcheckWorker === true) {
Expand Down
22 changes: 22 additions & 0 deletions packages/worker/test/e2e/__properties__/passing.mjs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
// @ts-check
import fc from 'fast-check';
import { propertyFor } from '@fast-check/worker';
import { writeFileSync } from 'fs';
import * as process from 'process';

// new w.Worker(new URL('file:///workspaces/fast-check/packages/worker/test/e2e/__properties__/passing.mjs'))
// Error [ERR_MODULE_NOT_FOUND]: Cannot find package 'fast-check' imported from /workspaces/fast-check/packages/worker/test/e2e/__properties__/passing.mjs
// at packageResolve (node:internal/modules/esm/resolve:844:9)
// at moduleResolve (node:internal/modules/esm/resolve:901:20)
// at defaultResolve (node:internal/modules/esm/resolve:1121:11)
// at ModuleLoader.defaultResolve (node:internal/modules/esm/loader:396:12)
// at ModuleLoader.resolve (node:internal/modules/esm/loader:365:25)
// at ModuleLoader.getModuleJob (node:internal/modules/esm/loader:240:38)
// at ModuleWrap.<anonymous> (node:internal/modules/esm/module_job:85:39)
// at link (node:internal/modules/esm/module_job:84:36)

const property = propertyFor(new URL(import.meta.url));
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] predicate (url=${new URL(import.meta.url)})\n`, {
flag: 'a',
});

export const passingProperty = property(
fc.integer({ min: -1000, max: 1000 }),
fc.integer({ min: -1000, max: 1000 }),
(from, to) => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] from: ${from}, to: ${to} -> START\n`, {
flag: 'a',
});
for (let i = from; i <= to; ++i) {
// Loop from "from" to "to" ALWAYS finite
}
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] from: ${from}, to: ${to} -> END\n`, {
flag: 'a',
});
},
);

0 comments on commit 76a2b5f

Please sign in to comment.