Skip to content

Commit

Permalink
simplify asyncMap implementation (#11252)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerel Miller <jerelmiller@gmail.com>
  • Loading branch information
phryneas and jerelmiller committed Oct 4, 2023
1 parent bc055e0 commit 327a2ab
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 39 deletions.
5 changes: 5 additions & 0 deletions .changeset/hungry-vans-walk.md
@@ -0,0 +1,5 @@
---
"@apollo/client": patch
---

Fixes a race condition in asyncMap that caused issues in React Native when errors were returned in the response payload along with a data property that was null.
4 changes: 2 additions & 2 deletions .size-limit.cjs
@@ -1,7 +1,7 @@
const checks = [
{
path: "dist/apollo-client.min.cjs",
limit: "37986",
limit: "37914",
},
{
path: "dist/main.cjs",
Expand All @@ -10,7 +10,7 @@ const checks = [
{
path: "dist/index.js",
import: "{ ApolloClient, InMemoryCache, HttpLink }",
limit: "32019",
limit: "31947",
},
...[
"ApolloProvider",
Expand Down
76 changes: 76 additions & 0 deletions src/testing/internal/ObservableStream.ts
@@ -0,0 +1,76 @@
import type { Observable } from "../../utilities/index.js";

interface TakeOptions {
timeout?: number;
}
type ObservableEvent<T> =
| { type: "next"; value: T }
| { type: "error"; error: any }
| { type: "complete" };

async function* observableToAsyncEventIterator<T>(observable: Observable<T>) {
let resolveNext: (value: ObservableEvent<T>) => void;
const promises: Promise<ObservableEvent<T>>[] = [];
queuePromise();

function queuePromise() {
promises.push(
new Promise<ObservableEvent<T>>((resolve) => {
resolveNext = (event: ObservableEvent<T>) => {
resolve(event);
queuePromise();
};
})
);
}

observable.subscribe(
(value) => resolveNext({ type: "next", value }),
(error) => resolveNext({ type: "error", error }),
() => resolveNext({ type: "complete" })
);

while (true) {
yield promises.shift()!;
}
}

class IteratorStream<T> {
constructor(private iterator: AsyncGenerator<T, void, unknown>) {}

async take({ timeout = 100 }: TakeOptions = {}): Promise<T> {
return Promise.race([
this.iterator.next().then((result) => result.value!),
new Promise<T>((_, reject) => {
setTimeout(
reject,
timeout,
new Error("Timeout waiting for next event")
);
}),
]);
}
}

export class ObservableStream<T> extends IteratorStream<ObservableEvent<T>> {
constructor(observable: Observable<T>) {
super(observableToAsyncEventIterator(observable));
}

async takeNext(options?: TakeOptions): Promise<T> {
const event = await this.take(options);
expect(event).toEqual({ type: "next", value: expect.anything() });
return (event as ObservableEvent<T> & { type: "next" }).value;
}

async takeError(options?: TakeOptions): Promise<any> {
const event = await this.take(options);
expect(event).toEqual({ type: "error", error: expect.anything() });
return (event as ObservableEvent<T> & { type: "error" }).error;
}

async takeComplete(options?: TakeOptions): Promise<void> {
const event = await this.take(options);
expect(event).toEqual({ type: "complete" });
}
}
85 changes: 85 additions & 0 deletions src/testing/internal/__tests__/ObservableStream.test.ts
@@ -0,0 +1,85 @@
import { Observable } from "../../../utilities";
import { ObservableStream } from "../ObservableStream";

it("allows to step through an observable until completion", async () => {
const stream = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
);
await expect(stream.takeNext()).resolves.toBe(1);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(3);
await expect(stream.takeComplete()).resolves.toBeUndefined();
});

it("allows to step through an observable until error", async () => {
const stream = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.error(new Error("expected"));
})
);
await expect(stream.takeNext()).resolves.toBe(1);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(3);
await expect(stream.takeError()).resolves.toEqual(expect.any(Error));
});

it("will time out if no more value is omitted", async () => {
const stream = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
})
);
await expect(stream.takeNext()).resolves.toBe(1);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).rejects.toEqual(expect.any(Error));
});

it.each([
["takeNext", "complete"],
["takeNext", "error"],
["takeError", "complete"],
["takeError", "next"],
["takeComplete", "next"],
["takeComplete", "error"],
])("errors when %s receives %s instead", async (expected, gotten) => {
const stream = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
// @ts-ignore
observer[gotten](3);
})
);
await expect(stream.takeNext()).resolves.toBe(1);
await expect(stream.takeNext()).resolves.toBe(2);
// @ts-ignore
await expect(stream[expected]()).rejects.toEqual(expect.any(Error));
});

it.each([
["takeNext", "next"],
["takeError", "error"],
["takeComplete", "complete"],
])("succeeds when %s, receives %s", async (expected, gotten) => {
const stream = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
// @ts-ignore
observer[gotten](3);
})
);
await expect(stream.takeNext()).resolves.toBe(1);
await expect(stream.takeNext()).resolves.toBe(2);
// @ts-ignore this should just not throw
await stream[expected]();
});
1 change: 1 addition & 0 deletions src/testing/internal/index.ts
@@ -1,2 +1,3 @@
export * from "./profile/index.js";
export * from "./disposables/index.js";
export { ObservableStream } from "./ObservableStream.js";
123 changes: 122 additions & 1 deletion src/utilities/observables/__tests__/asyncMap.ts
@@ -1,7 +1,7 @@
import { Observable } from "../Observable";
import { asyncMap } from "../asyncMap";
import { itAsync } from "../../../testing";

import { ObservableStream } from "../../../testing/internal";
const wait = (delayMs: number) =>
new Promise<void>((resolve) => setTimeout(resolve, delayMs));

Expand Down Expand Up @@ -141,4 +141,125 @@ describe("asyncMap", () => {
}),
});
});

test.each([
["sync", (n: number) => n * 2],
["async", async (n: number) => n * 2],
])("[%s] mapFn maps over values", async (_, mapFn) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 10);
}, 10);
});
const mapped = asyncMap(observable, mapFn);
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeNext()).resolves.toBe(6);
await expect(stream.takeNext()).resolves.toBe(8);
await stream.takeComplete();
});

test.each([["sync"], ["async"]])(
"[%s] mapFn notifies the observer with an error when an error is thrown inside the mapFn",
async (synchronity) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
// this will throw
observer.next(3);
// this will be swallowed and also not call `mapFn` anymore
observer.next(4);
setTimeout(() => {
observer.next(5);
observer.complete();
}, 10);
}, 10);
});
let lastMapped = 0;
const mapped = asyncMap(
observable,
synchronity === "sync"
? (n: number) => {
lastMapped = n;
if (n === 3) throw new Error("expected");
return n * 2;
}
: async (n: number) => {
lastMapped = n;
if (n === 3) throw new Error("expected");
return n * 2;
}
);
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeError()).resolves.toEqual(new Error("expected"));
// no more emits
expect(stream.take()).rejects.toMatch(/timeout/i);
// the observer was closed after the error, so we don't expect `mapFn` to
// be called for values that will not be emitted
expect(lastMapped).toBe(3);
}
);

test.each([
["sync", () => 99],
["async", async () => 99],
])(
"[%s] catchFn notifies the observer with a value when `catchFn` returns a value instead of re-throwing",
async (_, catchFn) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.error(new Error("expected"));
// will be ignored by parent Observable since the observer already closed
observer.next(4);
}, 10);
});
const mapped = asyncMap(observable, (n) => n * 2, catchFn);
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeNext()).resolves.toBe(99);
// even after recovery, further `.next` inside the observer will be ignored
// by the parent Observable itself, so asyncMap cannot do anything about that
expect(stream.take()).rejects.toMatch(/timeout/i);
}
);

test.each([
// prettier-ignore
["sync", () => { throw new Error("another error") }],
// prettier-ignore
["async", async () => { throw new Error("another error") }],
])("[%s] catchFn can map one error to another error", async (_, catchFn) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.error(new Error("expected"));
// will be ignored by Observable since the observer already closed
observer.next(4);
}, 10);
});
const mapped = asyncMap(observable, (n) => n * 2, catchFn);
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeError()).resolves.toEqual(
new Error("another error")
);
// even after recovery, further `.next` inside the observer will be ignored
// by the Observable itself, so asyncMap cannot do anything about that
expect(stream.take()).rejects.toMatch(/timeout/i);
});
});

0 comments on commit 327a2ab

Please sign in to comment.