diff --git a/.changeset/eleven-hounds-allow.md b/.changeset/eleven-hounds-allow.md new file mode 100644 index 000000000..0af48176b --- /dev/null +++ b/.changeset/eleven-hounds-allow.md @@ -0,0 +1,5 @@ +--- +"@farfetched/atomic-router": patch +--- + +Fix race in `barrierChain` diff --git a/.changeset/spotty-tigers-roll.md b/.changeset/spotty-tigers-roll.md new file mode 100644 index 000000000..b5408a411 --- /dev/null +++ b/.changeset/spotty-tigers-roll.md @@ -0,0 +1,5 @@ +--- +"@farfetched/core": patch +--- + +Abort all in-flight operations on `.reset` diff --git a/packages/atomic-router/src/barrier.ts b/packages/atomic-router/src/barrier.ts index 32ca1063a..82998304b 100644 --- a/packages/atomic-router/src/barrier.ts +++ b/packages/atomic-router/src/barrier.ts @@ -23,13 +23,7 @@ export function barrierChain(barrier: Barrier): ChainProtocol { }, }); - sample({ - clock: beforeOpen, - target: barrier.__.touch, - /* Use batch:false to increase priority of touch call */ - batch: false, - }); - sample({ clock: beforeOpen, target: fx }); + sample({ clock: beforeOpen, target: [barrier.__.touch, fx] }); return { beforeOpen, diff --git a/packages/core/package.json b/packages/core/package.json index cdeb14c6e..f04510188 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -41,7 +41,7 @@ "size-limit": [ { "path": "./dist/core.js", - "limit": "16.2 kB" + "limit": "16.3 kB" } ] } diff --git a/packages/core/src/concurrency/concurrency.ts b/packages/core/src/concurrency/concurrency.ts index a40cb5cf2..07ead18f4 100644 --- a/packages/core/src/concurrency/concurrency.ts +++ b/packages/core/src/concurrency/concurrency.ts @@ -1,5 +1,6 @@ import { type Event, + type Store, createStore, sample, attach, @@ -30,37 +31,13 @@ export function concurrency( ) { if (op.__.meta.flags.concurrencyFieldUsed) { console.error( - `Both concurrency-operator and concurrency-field are used on operation ${op.__.meta.name}.`, + `Both concurrency-operator and concurrency-field are used on operation ${op.__.meta.name}.`, `Please use only concurrency-operator, because field concurrency-field in createJsonQuery and createJsonMutation is deprecated and will be deleted soon.` ); } op.__.meta.flags.concurrencyOperatorUsed = true; - const $callObjects = createStore([], { serialize: 'ignore' }); - sample({ - clock: op.__.lowLevelAPI.callObjectCreated, - source: $callObjects, - fn: (callObjects, callObject) => - callObjects.filter((obj) => obj.status === 'pending').concat(callObject), - target: $callObjects, - }); - - const abortManyFx = createEffect((callObjects: CallObject[]) => { - callObjects.forEach((callObject) => callObject.abort()); - }); - - const abortAllFx = attach({ - source: $callObjects, - effect: abortManyFx, - }); - - sample({ - clock: abortManyFx.done, - source: $callObjects, - fn: (callObjects, { params: abortedCallObjects }) => - callObjects.filter((obj) => !abortedCallObjects.includes(obj)), - target: $callObjects, - }); + const $callObjects = callObejcts(op); if (strategy) { switch (strategy) { @@ -119,8 +96,49 @@ export function concurrency( } if (abortAll) { - sample({ clock: abortAll, target: abortAllFx }); + abortAllInFlight(op, { clock: abortAll }); } return op; } + +export function abortAllInFlight( + op: RemoteOperation, + { clock }: { clock: Event } +) { + sample({ clock, source: callObejcts(op), target: abortManyFx }); +} + +function callObejcts( + op: RemoteOperation +): Store { + if (!op.__.meta.$callObjects) { + const $callObjects = createStore([], { serialize: 'ignore' }); + + sample({ + clock: op.__.lowLevelAPI.callObjectCreated, + source: $callObjects, + fn: (callObjects, callObject) => + callObjects + .filter((obj) => obj.status === 'pending') + .concat(callObject), + target: $callObjects, + }); + + sample({ + clock: abortManyFx.done, + source: $callObjects, + fn: (callObjects, { params: abortedCallObjects }) => + callObjects.filter((obj) => !abortedCallObjects.includes(obj)), + target: $callObjects, + }); + + op.__.meta.$callObjects = $callObjects; + } + + return op.__.meta.$callObjects!; +} + +const abortManyFx = createEffect((callObjects: CallObject[]) => + callObjects.forEach((callObject) => callObject.abort()) +); diff --git a/packages/core/src/remote_operation/__test__/create_remote_operation.test.ts b/packages/core/src/remote_operation/__test__/create_remote_operation.test.ts index fda01da27..d9631f5a3 100644 --- a/packages/core/src/remote_operation/__test__/create_remote_operation.test.ts +++ b/packages/core/src/remote_operation/__test__/create_remote_operation.test.ts @@ -1,5 +1,6 @@ import { allSettled, createStore, createWatch, fork } from 'effector'; import { describe, test, expect, vi } from 'vitest'; +import { setTimeout } from 'timers/promises'; import { watchRemoteOperation } from '../../test_utils/watch_query'; import { unknownContract } from '../../contract/unknown_contract'; @@ -8,7 +9,6 @@ import { createRemoteOperation } from '../create_remote_operation'; import { isTimeoutError } from '../../errors/guards'; import { timeoutError } from '../../errors/create_error'; import { onAbort } from '../on_abort'; -import { setTimeout as wait } from 'timers/promises'; const defaultConfig = { name: 'test', @@ -317,9 +317,9 @@ describe('RemoteOperation.__.lowLevelAPI.callObjectCreated', () => { unit: operation.__.lowLevelAPI.callObjectCreated, scope, fn: ({ abort }) => { - setTimeout(() => { + setTimeout(10).then(() => { abort(); - }, 10); + }); }, }); @@ -337,7 +337,7 @@ describe('RemoteOperation.__.lowLevelAPI.callObjectCreated', () => { }); await allSettled(operation.start, { scope, params: 42 }); - await new Promise((resolve) => setTimeout(resolve, 20)); + await setTimeout(20); expect(operationFinished).toBeCalledTimes(1); expect(operationFinished.mock.calls.map(([arg]) => arg)) @@ -444,7 +444,7 @@ describe('RemoteOperation and onAbort callback', () => { const handleCancel = vi.fn(); operation.__.executeFx.use(async () => { - await wait(0); + await setTimeout(0); onAbort(handleCancel); @@ -539,4 +539,29 @@ describe('RemoteOperation and onAbort callback', () => { `); expect(handleCancel).toBeCalledTimes(0); }); + + test('abort in-flight operations in case of .reset call, issue #461', async () => { + const operation = createRemoteOperation({ + ...defaultConfig, + }); + + const handleCancel = vi.fn(); + + operation.__.executeFx.use(async () => { + onAbort(handleCancel); + + await setTimeout(10); + + return null; + }); + + const scope = fork(); + + allSettled(operation.start, { scope, params: 42 }); + allSettled(operation.reset, { scope }); + + await allSettled(scope); + + expect(handleCancel).toBeCalledTimes(1); + }); }); diff --git a/packages/core/src/remote_operation/create_remote_operation.ts b/packages/core/src/remote_operation/create_remote_operation.ts index e216587ac..540c763f6 100644 --- a/packages/core/src/remote_operation/create_remote_operation.ts +++ b/packages/core/src/remote_operation/create_remote_operation.ts @@ -29,6 +29,7 @@ import { type RemoteOperation } from './type'; import { get } from '../libs/lohyphen'; import { isAbortError } from '../errors/guards'; import { getCallObjectEvent } from './with_call_object'; +import { abortAllInFlight } from '../concurrency/concurrency'; export function createRemoteOperation< Params, @@ -467,7 +468,7 @@ export function createRemoteOperation< target: finished.finally, }); - return { + const op = { start, finished, started, @@ -499,6 +500,10 @@ export function createRemoteOperation< }, }, }; + + abortAllInFlight(op, { clock: reset }); + + return op; } function createDataSourceHandlers(dataSources: DataSource[]) { diff --git a/packages/core/src/remote_operation/type.ts b/packages/core/src/remote_operation/type.ts index ea9a599eb..61ffe3641 100644 --- a/packages/core/src/remote_operation/type.ts +++ b/packages/core/src/remote_operation/type.ts @@ -12,6 +12,7 @@ import type { CallObject } from './with_call_object'; interface DefaultMeta { name: string; flags: Record; + $callObjects?: Store; } export interface RemoteOperation<