Skip to content

Commit

Permalink
Merge pull request #77 from EventStore/awaiting-async-function-in-asy…
Browse files Browse the repository at this point in the history
…nc-iterator

Awaiting async function in async iterator
  • Loading branch information
hayley-jean committed Nov 19, 2020
2 parents b72b311 + 1a7762b commit 56fcf40
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 81 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module.exports = {
extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended"],
rules: {
"no-constant-condition": ["error", { checkLoops: false }],
// prettier decides
"no-unexpected-multiline": ["off"],
"@typescript-eslint/no-non-null-assertion": ["off"],
// better handled by ts itself
"@typescript-eslint/no-unused-vars": ["off"],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createTestNode, Defer, testEvents } from "../utils";
import { createTestNode, Defer, delay, testEvents } from "../utils";

import {
EventStoreConnection,
Expand Down Expand Up @@ -315,6 +315,45 @@ describe("connectToPersistentSubscription", () => {
1
);
});

test("ack with async function", async () => {
const STREAM_NAME = "async_iter_ack_fun";
const GROUP_NAME = "async_iter_ack_fun_group_name";
const doSomething = jest.fn();

await createPersistentSubscription(STREAM_NAME, GROUP_NAME)
.fromStart()
.execute(connection);

await writeEventsToStream(STREAM_NAME)
.send(...testEvents(99))
.send(finishEvent.build())
.execute(connection);

const subscription = await connectToPersistentSubscription(
STREAM_NAME,
GROUP_NAME
).execute(connection);

for await (const { event } of subscription) {
if (!event) continue;

if (event?.eventType === "test") {
// example of awaiting an async function when iterating over the async iterator
await delay(10);
}

doSomething(event);

subscription.ack(event.id);

if (event?.eventType === "finish-test") {
break;
}
}

expect(doSomething).toBeCalledTimes(100);
});
});

test("after the fact event listeners", async () => {
Expand Down
63 changes: 60 additions & 3 deletions src/__test__/streams/subscribeToAll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ describe("subscribeToAll", () => {

describe("should return a subscription", () => {
test("async iterator", async () => {
const STREAM_NAME = "async_iter";
const FINISH_TEST = "finish_async_iterator";
const MARKER_EVENT = "async_iter_marker";
const STREAM_NAME = "async_iter_sync_fun";
const FINISH_TEST = "finish_async_iterator_sync_fun";
const MARKER_EVENT = "async_iter_sync_fun_marker";
const doSomething = jest.fn();
const doSomethingElse = jest.fn();

Expand Down Expand Up @@ -273,6 +273,63 @@ describe("subscribeToAll", () => {
expect(doSomethingElse).toBeCalledTimes(9);
});

test("async iterator with async function", async () => {
const STREAM_NAME = "async_iter_async_fun";
const FINISH_TEST = "finish_async_iterator_async_fun";
const MARKER_EVENT = "async_iter_async_fun_marker";
const doSomething = jest.fn();
const doSomethingElse = jest.fn();

const markerEvent = EventData.json(MARKER_EVENT, {
message: "mark",
});

const finishEvent = EventData.json(FINISH_TEST, {
message: "lets wrap this up",
});

const writeResult = await writeEventsToStream(STREAM_NAME_B)
.send(markerEvent.build())
.execute(connection);

const subscription = await subscribeToAll()
.fromPosition(writeResult.position!)
.execute(connection);

writeEventsToStream(STREAM_NAME)
.send(...testEvents(99))
.send(finishEvent.build())
.execute(connection);

const readEvents = new Set<number>();

for await (const event of subscription) {
doSomething(event);

if (!event.event?.eventType.startsWith("$")) {
doSomethingElse(event);
}

if (event.event?.eventType === "test") {
// example of awaiting an async function when iterating over the async iterator
await delay(10);

if (event.event.isJson) {
readEvents.add(event.event.data.index as number);
}
}

if (event.event?.eventType === FINISH_TEST) {
break;
}
}

expect(doSomething).toBeCalled();
// unique numbers from 0 -> 98
expect(readEvents.size).toBe(99);
expect(doSomethingElse).toBeCalledTimes(100);
});

test("after the fact event listeners", async () => {
const STREAM_NAME = "after_the_fact";
const FINISH_TEST = "finish_after_the_fact";
Expand Down
8 changes: 7 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ export type Listeners<E, R> = {
[P in keyof SubscriptionListeners<E, R>]: Set<SubscriptionListeners<E, R>[P]>;
};

export interface Subscription<E, R> extends AsyncIterable<E> {
export interface Subscription<E, R> {
on<Name extends SubscriptionEvent>(
name: Name,
handler: SubscriptionListeners<E, R>[Name]
Expand All @@ -454,4 +454,10 @@ export interface Subscription<E, R> extends AsyncIterable<E> {
): Subscription<E, R>;

unsubscribe: () => void;

isPaused: boolean;
pause: () => void;
resume: () => void;

[Symbol.asyncIterator](): AsyncIterator<E>;
}
36 changes: 36 additions & 0 deletions src/utils/EventsOnlyStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Transform, TransformCallback, TransformOptions } from "stream";
import { GRPCReadResp, ConvertGrpcEvent } from "./convertGrpcEvent";

export type EventsOnlyStreamOptions = Omit<
TransformOptions,
"readableObjectMode" | "writableObjectMode" | "objectMode"
>;

export class EventsOnlyStream<E> extends Transform {
private convertGrpcEvent: ConvertGrpcEvent<E>;

constructor(
convertGrpcEvent: ConvertGrpcEvent<E>,
options: EventsOnlyStreamOptions = {}
) {
super({
...options,
readableObjectMode: true,
writableObjectMode: true,
});
this.convertGrpcEvent = convertGrpcEvent;
}

_transform(
resp: GRPCReadResp,
_encoding: string,
next: TransformCallback
): void {
if (resp.hasEvent()) {
const resolved = this.convertGrpcEvent(resp.getEvent()!);
return next(null, resolved);
}

next();
}
}
54 changes: 19 additions & 35 deletions src/utils/OneWaySubscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
/* eslint-disable @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/no-explicit-any */

import { ClientReadableStream, ServiceError } from "@grpc/grpc-js";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { ReadResp } from "../../generated/streams_pb";
Expand All @@ -11,6 +9,8 @@ import {
SubscriptionListeners,
} from "../types";
import { convertToCommandError } from "./CommandError";
import { ConvertGrpcEvent } from "./convertGrpcEvent";
import { EventsOnlyStream } from "./EventsOnlyStream";

export class OneWaySubscription<E>
implements Subscription<E, SubscriptionReport> {
Expand All @@ -22,18 +22,18 @@ export class OneWaySubscription<E>
close: new Set(),
};
private _stream: ClientReadableStream<ReadResp>;
private _running = true;
private _resolve?: (event: E | null) => void;
private _convertGrpcEvent: ConvertGrpcEvent<E>;

constructor(
stream: ClientReadableStream<ReadResp>,
listeners: Listeners<E, SubscriptionReport>,
convertGrpcEvent: (event: ReadResp.ReadEvent) => E
convertGrpcEvent: ConvertGrpcEvent<E>
) {
this._stream = stream;
this._listeners = listeners;
this._convertGrpcEvent = convertGrpcEvent;

stream.on("data", (resp: ReadResp) => {
this._stream.on("data", (resp: ReadResp) => {
if (resp.hasConfirmation()) {
this._listeners.confirmation.forEach((fn) => fn());
}
Expand All @@ -47,11 +47,11 @@ export class OneWaySubscription<E>
}
});

stream.on("end", () => {
this._stream.on("end", () => {
this._listeners.end.forEach((fn) => fn());
});

stream.on("error", (err: ServiceError) => {
this._stream.on("error", (err: ServiceError) => {
if (err.code === Status.CANCELLED) return;
const error = convertToCommandError(err);
this._listeners.error.forEach((fn) => fn(error));
Expand Down Expand Up @@ -89,40 +89,24 @@ export class OneWaySubscription<E>

public unsubscribe = (): void => {
this._stream.cancel();
this._running = false;

if (this._resolve) this._resolve(null);
};

public async return(): Promise<IteratorReturnResult<E>> {
this.unsubscribe();
return { done: true } as IteratorReturnResult<E>;
public get isPaused(): boolean {
return this._stream.isPaused();
}

public next = (): Promise<IteratorResult<E, never>> => {
return new Promise<IteratorResult<E, never>>((resolve, reject) => {
if (this._resolve) {
return reject(
new Error(
"Cannot iterate to the next while previous is still active."
)
);
}
if (!this._running) {
return resolve({ done: true } as IteratorReturnResult<never>);
}

this._resolve = (e) => {
resolve({ value: e } as IteratorYieldResult<E>);
delete this._resolve;
};
public pause = (): void => {
this._stream.pause();
};

this.once("event", this._resolve);
});
public resume = (): void => {
this._stream.resume();
};

/** Iterate the events asynchronously */
public [Symbol.asyncIterator] = (): OneWaySubscription<E> => {
return this;
public [Symbol.asyncIterator] = (): AsyncIterator<E> => {
return this._stream
.pipe(new EventsOnlyStream(this._convertGrpcEvent))
[Symbol.asyncIterator]();
};
}
45 changes: 13 additions & 32 deletions src/utils/TwoWaySubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "../types";
import { convertGrpcEvent } from "./convertGrpcEvent";
import { convertToCommandError } from "./CommandError";
import { EventsOnlyStream } from "./EventsOnlyStream";

export class TwoWaySubscription
implements Subscription<ResolvedEvent, PersistentReport> {
Expand All @@ -27,8 +28,6 @@ export class TwoWaySubscription
close: new Set(),
};
private _stream: ClientDuplexStream<ReadReq, ReadResp>;
private _running = true;
private _resolve?: (event: ResolvedEvent | null) => void;

constructor(
stream: ClientDuplexStream<ReadReq, ReadResp>,
Expand Down Expand Up @@ -149,42 +148,24 @@ export class TwoWaySubscription
public unsubscribe = (): void => {
this._stream.end();
this._stream.cancel();
this._running = false;

if (this._resolve) this._resolve(null);
};

public async return(): Promise<IteratorReturnResult<never>> {
this.unsubscribe();
return { done: true } as IteratorReturnResult<never>;
public get isPaused(): boolean {
return this._stream.isPaused();
}

public next = (): Promise<IteratorResult<ResolvedEvent, never>> => {
return new Promise<IteratorResult<ResolvedEvent, never>>(
(resolve, reject) => {
if (this._resolve) {
return reject(
new Error(
"Cannot iterate to the next while previous is still active."
)
);
}
if (!this._running) {
return resolve({ done: true } as IteratorReturnResult<never>);
}

this._resolve = (e) => {
resolve({ value: e } as IteratorYieldResult<ResolvedEvent>);
delete this._resolve;
};

this.once("event", this._resolve);
}
);
public pause = (): void => {
this._stream.pause();
};

public resume = (): void => {
this._stream.resume();
};

/** Iterate the events asynchronously */
public [Symbol.asyncIterator] = (): TwoWaySubscription => {
return this;
public [Symbol.asyncIterator] = (): AsyncIterator<ResolvedEvent> => {
return this._stream
.pipe(new EventsOnlyStream(convertGrpcEvent))
[Symbol.asyncIterator]();
};
}

0 comments on commit 56fcf40

Please sign in to comment.