Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨(worker) Add support for non-serializable data in workers #4063

Merged
merged 45 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
2fe7e4c
✨ Add support for non-serializable data in workers
dubzzz Jul 10, 2023
02feb3b
Merge branch 'main' into worker-non-serializable
dubzzz Jul 27, 2023
314b525
Merge remote-tracking branch 'origin/main' into worker-non-serializable
dubzzz Mar 20, 2024
91ef84e
Merge remote-tracking branch 'origin/main' into worker-non-serializable
dubzzz Mar 20, 2024
d0319ee
fix part of compilation
dubzzz Mar 20, 2024
9341399
fix import
dubzzz Mar 20, 2024
d62e1a8
fix mainthreadrunner
dubzzz Mar 20, 2024
5e377ee
rethrow errors
dubzzz Mar 20, 2024
7d456fb
add pure-rand
dubzzz Mar 20, 2024
6d44c57
fix type imports
dubzzz Mar 20, 2024
f41c672
fix units compil
dubzzz Mar 20, 2024
15e1c28
copy state first
dubzzz Mar 20, 2024
9f8c50b
use the right PRNG
dubzzz Mar 20, 2024
a436939
better state clone (compile)
dubzzz Mar 20, 2024
6dd286a
introduce a location for generator
dubzzz Mar 20, 2024
4d71750
adapt tests
dubzzz Mar 20, 2024
1751b03
Do not generate anything within the main thread
dubzzz Mar 20, 2024
65485b4
try better
dubzzz Mar 20, 2024
b5738a8
fix code
dubzzz Mar 20, 2024
6472a04
no need for verbose 1 to run
dubzzz Mar 20, 2024
d1ceee8
drop comment
dubzzz Mar 20, 2024
dde3bef
extract code creating xorshift
dubzzz Mar 22, 2024
b0ae725
better types
dubzzz Mar 22, 2024
b02e54d
rename
dubzzz Mar 22, 2024
36cc6a0
fix compilation
dubzzz Mar 22, 2024
0c274e3
check main thread fails
dubzzz Mar 22, 2024
60a953a
check sample
dubzzz Mar 22, 2024
c8f3541
sample
dubzzz Mar 22, 2024
4adf74f
Merge remote-tracking branch 'origin/main' into worker-non-serializable
dubzzz Mar 22, 2024
8beb73c
more wip
dubzzz Mar 23, 2024
c7f7823
Update WorkerPropertyBuilder.ts
dubzzz Mar 23, 2024
dc6f1cb
Update WorkerPropertyBuilder.ts
dubzzz Mar 23, 2024
4b4b508
Update WorkerPropertyFromWorker.ts
dubzzz Mar 23, 2024
1087beb
better property
dubzzz Mar 24, 2024
897689c
add tests
dubzzz Mar 24, 2024
e64826e
try more
dubzzz Mar 24, 2024
84d9d63
do it
dubzzz Mar 24, 2024
fbf4350
Update nonSerializableData.spec.ts
dubzzz Mar 24, 2024
914b345
fix typings
dubzzz Mar 25, 2024
18eace3
document it
dubzzz Mar 25, 2024
983e9e3
bump
dubzzz Mar 25, 2024
765f95a
typo doc
dubzzz Mar 25, 2024
fe55cc4
drop old
dubzzz Mar 25, 2024
daccdf8
cleaner property worker
dubzzz Mar 25, 2024
e1af65e
fix tests
dubzzz Mar 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .yarn/versions/57cccee4.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
releases:
"@fast-check/worker": minor

declined:
- "@fast-check/jest"
9 changes: 9 additions & 0 deletions packages/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ const property = propertyFor(new URL(import.meta.url), { isolationLevel: 'predic
// - "predicate": One worker per run of the predicate
```

By default, workers will receive the generated values from their parent thread. In some cases, such sending is made impossible as the generated values include non-serializable pieces. In such cases, you can opt-in to generate the values directly within the workers by using:

```js
const property = propertyFor(new URL(import.meta.url), { randomSource: 'worker' });
// Other values:
// - "main-thread": The main thread will be responsible to generate the random values and send them to the worker thread. It unfortunately cannot send any value that cannot be serialized between threads. (default)
// - "worker": The worker is responsible to generate its own values based on the instructions provided by the main thread. Switching to a worker mode allows to support non-serializable values, unfortunately it drops all shrinking. capabilities.
```

## Minimal requirements

- Node ≥14.18.0<sup>(1)</sup><sup>(2)</sup><sup>(3)</sup>
Expand Down
3 changes: 2 additions & 1 deletion packages/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
},
"homepage": "https://github.com/dubzzz/fast-check/tree/main/packages/worker#readme",
"dependencies": {
"fast-check": "^3.4.0"
"fast-check": "^3.4.0",
"pure-rand": "^6.1.0"
},
"devDependencies": {
"@babel/core": "^7.24.3",
Expand Down
41 changes: 23 additions & 18 deletions packages/worker/src/internals/MainThreadRunner.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
import fc from 'fast-check';
import type { PropertyArbitraries, WorkerProperty } from './SharedTypes.js';
import { BasicPool } from './worker-pool/BasicPool.js';
import { Lock } from './lock/Lock.js';
import type { IWorkerPool, PooledWorker } from './worker-pool/IWorkerPool.js';
import type { IWorkerPool, Payload, PooledWorker } from './worker-pool/IWorkerPool.js';
import { OneTimePool } from './worker-pool/OneTimePool.js';
import { GlobalPool } from './worker-pool/GlobalPool.js';
import { buildWorkerProperty } from './worker-property/WorkerPropertyBuilder.js';

/**
* Create a property able to run in the main thread and firing workers whenever required
*
* @param workerFileUrl - The URL towards the file holding the worker's code
* @param predicateId - Id of the predicate
* @param isolationLevel - The kind of isolation to be put in place between two executions of predicates
* @param randomSource - Where should we generate the random values?
* @param arbitraries - The arbitraries used to generate the inputs for the predicate hold within the worker
*/
export function runMainThread<Ts extends [unknown, ...unknown[]]>(
workerFileUrl: URL,
predicateId: number,
isolationLevel: 'file' | 'property' | 'predicate',
randomSource: 'main-thread' | 'worker',
arbitraries: PropertyArbitraries<Ts>,
): { property: WorkerProperty<Ts>; terminateAllWorkers: () => Promise<void> } {
const lock = new Lock();
const pool: IWorkerPool<boolean | void, Ts> =
isolationLevel === 'predicate'
? new OneTimePool(workerFileUrl)
: isolationLevel === 'property'
? new BasicPool(workerFileUrl)
: new GlobalPool(workerFileUrl);
const pool: IWorkerPool<boolean | void, Payload<Ts>> = isolationLevel === 'predicate'
? new OneTimePool(workerFileUrl)
: isolationLevel === 'property'
? new BasicPool(workerFileUrl)
: new GlobalPool(workerFileUrl);

let releaseLock: (() => void) | undefined = undefined;
let worker: PooledWorker<boolean | void, Ts> | undefined = undefined;
const property = fc.asyncProperty<Ts>(...arbitraries, async (...inputs) => {
return new Promise((resolve, reject) => {
if (worker === undefined) {
reject(new Error('Badly initialized worker, unable to run the property'));
return;
}
worker.register(predicateId, inputs, resolve, reject);
});
});
let worker: PooledWorker<boolean | void, Payload<Ts>> | undefined = undefined;
const property = buildWorkerProperty(
arbitraries,
async (...inputs) => {
return new Promise((resolve, reject) => {
if (worker === undefined) {
reject(new Error('Badly initialized worker, unable to run the property'));
return;
}
worker.register(predicateId, property.getPayload(inputs), resolve, reject);
});
},
randomSource === 'worker',
);
property.beforeEach(async (hookFunction) => {
await hookFunction(); // run outside of the worker, can throw
const acquired = await lock.acquire();
Expand Down
17 changes: 17 additions & 0 deletions packages/worker/src/internals/ValueFromState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { xorshift128plus } from 'pure-rand';
import { Random } from 'fast-check';
import type { IRawProperty } from 'fast-check';

/**
* Definition of the Value
*/
export type ValueState = { rngState: number[]; runId: number | undefined };

/**
* Build the appropriate Value based on the provided state
* @param state - The state defining the Value to be generated (the how)
*/
export function generateValueFromState<Ts>(property: IRawProperty<Ts>, state: ValueState): Ts {
const mrng = new Random(xorshift128plus.fromState(state.rngState));
return property.generate(mrng, state.runId).value_;
}
13 changes: 12 additions & 1 deletion packages/worker/src/internals/worker-pool/IWorkerPool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ValueState } from '../ValueFromState.js';

export type OnSuccessCallback<TSuccess> = (value: TSuccess) => void;
export type OnErrorCallback = (error: unknown) => void;

Expand All @@ -19,7 +21,16 @@ export type PooledWorker<TSuccess, TPayload> = {
/**
* Message exchanged from the pool to the worker
*/
export type PoolToWorkerMessage<TPayload> = { targetPredicateId: number; runId: number; payload: TPayload };
export type PoolToWorkerMessage<TPayload> = {
targetPredicateId: number;
runId: number;
payload: TPayload;
};

/**
* Payload being sent to the worker to control it
*/
export type Payload<TValue> = { source: 'main'; value: TValue } | ({ source: 'worker' } & ValueState);

/**
* Message exchanged from the worker to the pool
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { IAsyncPropertyWithHooks, Value, Stream, PreconditionFailure, PropertyFailure } from 'fast-check';
import type { WorkerProperty } from './SharedTypes.js';
import type { WorkerProperty } from '../SharedTypes.js';

/**
* NoopWorkerProperty is a placeholder instance of property returned
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import type { IAsyncPropertyWithHooks } from 'fast-check';
import type { PropertyArbitraries } from '../SharedTypes.js';
import type { Payload } from '../worker-pool/IWorkerPool.js';

import fc from 'fast-check';
import { WorkerPropertyFromWorker } from './WorkerPropertyFromWorker.js';

/**
* Property tailored for usage with workers
* it produces the payload to be sent to the workers
*/
type WorkerProperty<Ts> = IAsyncPropertyWithHooks<Ts> & { getPayload: (_inputs: Ts) => Payload<Ts> };

/**
* Build an async property tailored for workers
* @param arbitraries - Arbitraries supposed to generate our values
* @param predicate - Predicate for the property
* @param generateValuesInMainThread - When "true" the value gets generated inside the property itself, otherwise responsability goes to the worker
*/
export function buildWorkerProperty<Ts extends [unknown, ...unknown[]]>(
arbitraries: PropertyArbitraries<Ts>,
predicate: (...args: Ts) => Promise<boolean | void>,
generateValuesInMainThread: boolean,
): WorkerProperty<Ts> {
if (!generateValuesInMainThread) {
return Object.assign(fc.asyncProperty<Ts>(...arbitraries, predicate), {
getPayload: (inputs: Ts): Payload<Ts> => ({ source: 'main', value: inputs }),
});
}
return new WorkerPropertyFromWorker(arbitraries, predicate);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import type {
AsyncPropertyHookFunction,
IAsyncPropertyWithHooks,
IRawProperty,
PreconditionFailure,
PropertyFailure,
Random,
Stream,
Value,
} from 'fast-check';
import type { PropertyArbitraries } from '../SharedTypes.js';
import type { ValueState } from '../ValueFromState.js';
import type { Payload } from '../worker-pool/IWorkerPool.js';

import fc from 'fast-check';
import { generateValueFromState } from '../ValueFromState.js';

const WorkerPropertyFromWorkerCache = new WeakMap<object, ValueState>();

class WorkerPropertyFromWorkerError extends Error {
constructor(...args: Parameters<typeof Error>) {
super(...args);
}
}

function lazyGenerateValueFromState<Ts>(property: IRawProperty<Ts>, state: ValueState): () => Ts {
let value: Ts | undefined = undefined;
return function getValue(): Ts {
if (value === undefined) {
value = generateValueFromState(property, state);
}
return value;
};
}

function extractCacheKey(inputs: [unknown, ...unknown[]]): object {
return inputs[0] as object;
}

function buildInputsAndRegister<Ts extends [unknown, ...unknown[]]>(
property: IRawProperty<Ts>,
valueState: ValueState,
numArbitraries: number,
): Ts {
const getValue = lazyGenerateValueFromState(property, valueState);
const inputs: object[] = [...Array(numArbitraries)].map((_, index) => ({
toString: () => fc.stringify(getValue()[index]),
}));

WorkerPropertyFromWorkerCache.set(extractCacheKey(inputs as [object, ...object[]]), valueState);

// WARNING: The type for inputs variable is obviously not the one the caller expects!
// But in the context of WorkerPropertyFromWorker, caller knows that the inputs returned by generate
// should not be consummed as-is and that they will be re-created on worker's side.
return inputs as Ts;
}

/**
* A WorkerProperty delegating generating the values to the Worker thread
* instead of running it into the main thread
*/
export class WorkerPropertyFromWorker<Ts extends [unknown, ...unknown[]]> implements IAsyncPropertyWithHooks<Ts> {
private readonly numArbitraries: number;
private readonly internalProperty: IAsyncPropertyWithHooks<Ts>;

constructor(arbitraries: PropertyArbitraries<Ts>, predicate: (...args: Ts) => Promise<boolean | void>) {
this.numArbitraries = arbitraries.length;
this.internalProperty = fc.asyncProperty<Ts>(...arbitraries, predicate);
}

isAsync(): true {
return this.internalProperty.isAsync();
}

generate(mrng: Random, runId?: number | undefined): Value<Ts> {
// Extracting and cloning the state of Random before altering it
const rawRngState = mrng.getState();
if (rawRngState === undefined) {
throw new WorkerPropertyFromWorkerError('Cannot extract any state from the provided instance of Random');
}
const valueState = { rngState: rawRngState.slice(), runId };
const inputs = buildInputsAndRegister(this.internalProperty, valueState, this.numArbitraries);
return new fc.Value(inputs, undefined);
}

shrink(_value: Value<Ts>): Stream<Value<Ts>> {
// No shrink on worker-based generations
return fc.Stream.nil();
}

run(v: Ts, dontRunHook?: boolean | undefined): Promise<PreconditionFailure | PropertyFailure | null> {
return this.internalProperty.run(v, dontRunHook);
}

beforeEach(hookFunction: AsyncPropertyHookFunction): IAsyncPropertyWithHooks<Ts> {
return this.internalProperty.beforeEach(hookFunction);
}

afterEach(hookFunction: AsyncPropertyHookFunction): IAsyncPropertyWithHooks<Ts> {
return this.internalProperty.afterEach(hookFunction);
}

runBeforeEach(): Promise<void> {
if (this.internalProperty.runBeforeEach !== undefined) {
return this.internalProperty.runBeforeEach();
}
return Promise.resolve();
}

runAfterEach(): Promise<void> {
if (this.internalProperty.runAfterEach !== undefined) {
return this.internalProperty.runAfterEach();
}
return Promise.resolve();
}

getPayload(inputs: Ts): Payload<Ts> {
const valueState = WorkerPropertyFromWorkerCache.get(extractCacheKey(inputs));
if (valueState === undefined) {
throw new WorkerPropertyFromWorkerError('Cannot get a relevant payload to execute this run');
}
return { source: 'worker', ...valueState };
}
}
8 changes: 6 additions & 2 deletions packages/worker/src/internals/worker-runner/WorkerRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { MessagePort } from 'node:worker_threads';
import type { MainThreadToWorkerMessage, PropertyPredicate, WorkerToMainThreadMessage } from '../SharedTypes.js';
import type { ValueState } from '../ValueFromState.js';
import type { Payload } from '../worker-pool/IWorkerPool.js';

/**
* Setup a worker listening to parentPort and able to run a single time for a given predicate
Expand All @@ -11,14 +13,16 @@ export function runWorker<Ts extends unknown[]>(
parentPort: MessagePort,
predicateId: number,
predicate: PropertyPredicate<Ts>,
buildInputs: (state: ValueState) => Ts,
): void {
parentPort.on('message', (message: MainThreadToWorkerMessage<Ts>) => {
parentPort.on('message', (message: MainThreadToWorkerMessage<Payload<Ts>>) => {
const { payload, targetPredicateId, runId } = message;
if (targetPredicateId !== predicateId) {
// The current predicate is not the one targeted by the received message
return;
}
Promise.resolve(predicate(...payload)).then(
const inputs = payload.source === 'main' ? payload.value : buildInputs(payload);
Promise.resolve(predicate(...inputs)).then(
(output) => {
const message: WorkerToMainThreadMessage = { success: true, output, runId };
parentPort.postMessage(message);
Expand Down
31 changes: 27 additions & 4 deletions packages/worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { isMainThread, parentPort, workerData } from 'node:worker_threads';

import { assert as fcAssert, type IAsyncProperty, type IProperty, type Parameters } from 'fast-check';
import { assert as fcAssert, property as fcProperty } from 'fast-check';
import type { IAsyncProperty, IProperty, Parameters } from 'fast-check';
import { runWorker } from './internals/worker-runner/WorkerRunner.js';
import { runMainThread } from './internals/MainThreadRunner.js';
import { NoopWorkerProperty } from './internals/NoopWorkerProperty.js';
import { NoopWorkerProperty } from './internals/worker-property/NoopWorkerProperty.js';
import type { PropertyArbitraries, PropertyPredicate, WorkerProperty } from './internals/SharedTypes.js';
import { runNoWorker } from './internals/worker-runner/NoWorkerRunner.js';
import { generateValueFromState } from './internals/ValueFromState.js';

let lastPredicateId = 0;
const allKnownTerminateAllWorkersPerProperty = new Map<
Expand Down Expand Up @@ -64,6 +66,18 @@ export type PropertyForOptions = {
* @default "file"
*/
isolationLevel?: 'file' | 'property' | 'predicate';

/**
* Where should we generate the random values?
*
* - `main-thread`: The main thread will be responsible to generate the random values and send them to the worker thread.
* It unfortunately cannot send any value that cannot be serialized between threads.
* - `worker`: The worker is responsible to generate its own values based on the instructions provided by the main thread.
* Switching to a worker mode allows to support non-serializable values, unfortunately it drops all shrinking capabilities.
*
* @default "main-thread"
*/
randomSource?: 'main-thread' | 'worker';
};

const registeredPredicates = new Set<number>();
Expand All @@ -80,14 +94,23 @@ function workerProperty<Ts extends [unknown, ...unknown[]]>(
if (isMainThread) {
// Main thread code
const isolationLevel = options.isolationLevel || 'file';
const randomSource = options.randomSource || 'main-thread';
const arbitraries = args.slice(0, -1) as PropertyArbitraries<Ts>;
const { property, terminateAllWorkers } = runMainThread<Ts>(url, currentPredicateId, isolationLevel, arbitraries);
const { property, terminateAllWorkers } = runMainThread<Ts>(
url,
currentPredicateId,
isolationLevel,
randomSource,
arbitraries,
);
allKnownTerminateAllWorkersPerProperty.set(property, terminateAllWorkers);
return property;
} else if (parentPort !== null && workerData.fastcheckWorker === true) {
// Worker code
const arbitraries = args.slice(0, -1) as PropertyArbitraries<Ts>;
const predicate = args[args.length - 1] as PropertyPredicate<Ts>;
runWorker(parentPort, currentPredicateId, predicate);
const property: IProperty<Ts> = fcProperty(...arbitraries, () => true);
runWorker(parentPort, currentPredicateId, predicate, (state) => generateValueFromState(property, state));
registeredPredicates.add(currentPredicateId);
}
// Cannot throw for invalid worker at this point as we may not be the only worker for this run
Expand Down
Loading
Loading