Skip to content

Commit

Permalink
Abort all in-flight operations on .reset
Browse files Browse the repository at this point in the history
  • Loading branch information
igorkamyshev committed Apr 10, 2024
1 parent 563de42 commit e7fbfd0
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/spotty-tigers-roll.md
@@ -0,0 +1,5 @@
---
"@farfetched/core": patch
---

Abort all in-flight operations on `.reset`
72 changes: 45 additions & 27 deletions packages/core/src/concurrency/concurrency.ts
@@ -1,5 +1,6 @@
import {
type Event,
type Store,
createStore,
sample,
attach,
Expand Down Expand Up @@ -30,37 +31,13 @@ export function concurrency(
) {
if (op.__.meta.flags.concurrencyFieldUsed) {
console.error(
`Both concurrency-operator and concurrency-field are used on operation ${op.__.meta.name}.`,
`Both concurrency-operator and concurrency-field are used on operation ${op.__.meta.name}.`,
`Please use only concurrency-operator, because field concurrency-field in createJsonQuery and createJsonMutation is deprecated and will be deleted soon.`
);
}
op.__.meta.flags.concurrencyOperatorUsed = true;

const $callObjects = createStore<CallObject[]>([], { serialize: 'ignore' });
sample({
clock: op.__.lowLevelAPI.callObjectCreated,
source: $callObjects,
fn: (callObjects, callObject) =>
callObjects.filter((obj) => obj.status === 'pending').concat(callObject),
target: $callObjects,
});

const abortManyFx = createEffect((callObjects: CallObject[]) => {
callObjects.forEach((callObject) => callObject.abort());
});

const abortAllFx = attach({
source: $callObjects,
effect: abortManyFx,
});

sample({
clock: abortManyFx.done,
source: $callObjects,
fn: (callObjects, { params: abortedCallObjects }) =>
callObjects.filter((obj) => !abortedCallObjects.includes(obj)),
target: $callObjects,
});
const $callObjects = callObejcts(op);

if (strategy) {
switch (strategy) {
Expand Down Expand Up @@ -119,8 +96,49 @@ export function concurrency(
}

if (abortAll) {
sample({ clock: abortAll, target: abortAllFx });
abortAllInFlight(op, { clock: abortAll });
}

return op;
}

export function abortAllInFlight(
op: RemoteOperation<any, any, any, any>,
{ clock }: { clock: Event<any> }
) {
sample({ clock, source: callObejcts(op), target: abortManyFx });
}

function callObejcts(
op: RemoteOperation<any, any, any, any>
): Store<CallObject[]> {
if (!op.__.meta.$callObjects) {
const $callObjects = createStore<CallObject[]>([], { serialize: 'ignore' });

sample({
clock: op.__.lowLevelAPI.callObjectCreated,
source: $callObjects,
fn: (callObjects, callObject) =>
callObjects
.filter((obj) => obj.status === 'pending')
.concat(callObject),
target: $callObjects,
});

sample({
clock: abortManyFx.done,
source: $callObjects,
fn: (callObjects, { params: abortedCallObjects }) =>
callObjects.filter((obj) => !abortedCallObjects.includes(obj)),
target: $callObjects,
});

op.__.meta.$callObjects = $callObjects;
}

return op.__.meta.$callObjects!;
}

const abortManyFx = createEffect((callObjects: CallObject[]) =>
callObjects.forEach((callObject) => callObject.abort())
);
@@ -1,5 +1,6 @@
import { allSettled, createStore, createWatch, fork } from 'effector';
import { describe, test, expect, vi } from 'vitest';
import { setTimeout } from 'timers/promises';

import { watchRemoteOperation } from '../../test_utils/watch_query';
import { unknownContract } from '../../contract/unknown_contract';
Expand All @@ -8,7 +9,6 @@ import { createRemoteOperation } from '../create_remote_operation';
import { isTimeoutError } from '../../errors/guards';
import { timeoutError } from '../../errors/create_error';
import { onAbort } from '../on_abort';
import { setTimeout as wait } from 'timers/promises';

const defaultConfig = {
name: 'test',
Expand Down Expand Up @@ -317,9 +317,9 @@ describe('RemoteOperation.__.lowLevelAPI.callObjectCreated', () => {
unit: operation.__.lowLevelAPI.callObjectCreated,
scope,
fn: ({ abort }) => {
setTimeout(() => {
setTimeout(10).then(() => {
abort();
}, 10);
});
},
});

Expand All @@ -337,7 +337,7 @@ describe('RemoteOperation.__.lowLevelAPI.callObjectCreated', () => {
});

await allSettled(operation.start, { scope, params: 42 });
await new Promise((resolve) => setTimeout(resolve, 20));
await setTimeout(20);

expect(operationFinished).toBeCalledTimes(1);
expect(operationFinished.mock.calls.map(([arg]) => arg))
Expand Down Expand Up @@ -444,7 +444,7 @@ describe('RemoteOperation and onAbort callback', () => {
const handleCancel = vi.fn();

operation.__.executeFx.use(async () => {
await wait(0);
await setTimeout(0);

onAbort(handleCancel);

Expand Down Expand Up @@ -539,4 +539,29 @@ describe('RemoteOperation and onAbort callback', () => {
`);
expect(handleCancel).toBeCalledTimes(0);
});

test('abort in-flight operations in case of .reset call', async () => {
const operation = createRemoteOperation({
...defaultConfig,
});

const handleCancel = vi.fn();

operation.__.executeFx.use(async () => {
onAbort(handleCancel);

await setTimeout(10);

return null;
});

const scope = fork();

allSettled(operation.start, { scope, params: 42 });
allSettled(operation.reset, { scope });

await allSettled(scope);

expect(handleCancel).toBeCalledTimes(1);
});
});
Expand Up @@ -29,6 +29,7 @@ import { type RemoteOperation } from './type';
import { get } from '../libs/lohyphen';
import { isAbortError } from '../errors/guards';
import { getCallObjectEvent } from './with_call_object';
import { abortAllInFlight } from '../concurrency/concurrency';

export function createRemoteOperation<
Params,
Expand Down Expand Up @@ -435,7 +436,7 @@ export function createRemoteOperation<
target: finished.finally,
});

return {
const op = {
start,
finished,
started,
Expand Down Expand Up @@ -466,6 +467,10 @@ export function createRemoteOperation<
},
},
};

abortAllInFlight(op, { clock: reset });

return op;
}

function createDataSourceHandlers<Params>(dataSources: DataSource<Params>[]) {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/remote_operation/type.ts
Expand Up @@ -12,6 +12,7 @@ import type { CallObject } from './with_call_object';
interface DefaultMeta {
name: string;
flags: Record<string, boolean>;
$callObjects?: Store<CallObject[]>;
}

export interface RemoteOperation<
Expand Down

0 comments on commit e7fbfd0

Please sign in to comment.