From a89c7f2a5c909e00a8ee0a0aa32c18398978ded5 Mon Sep 17 00:00:00 2001 From: calebboyd Date: Fri, 6 Jan 2023 17:00:02 -0600 Subject: [PATCH] feat: add flush for consuming stream outside of blocked mode --- src/stream.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/stream.ts b/src/stream.ts index 808e2ea..29c6222 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -268,6 +268,21 @@ export class RedisStream { await this.maybeUnblock() } + public async flush(client?: RedisClient) { + if (!this.pendingAcks.size) return + let c = client + if (!this.done) { + c = c ?? this.control ? this.control : this.client + } + if (this.done && !this.createdConnection) { + c = c ?? this.client + } + if (!c) throw new Error('No suitable client') + const pipeline = c.pipeline() + ack(pipeline, this) + await pipeline.exec() + } + protected async return(): Promise { await this.quit() }