Skip to content

Commit

Permalink
feat: add flush for consuming stream outside of blocked mode
Browse files Browse the repository at this point in the history
  • Loading branch information
calebboyd committed Jan 6, 2023
1 parent 207fe32 commit a89c7f2
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,21 @@ export class RedisStream {
await this.maybeUnblock()
}

public async flush(client?: RedisClient) {
if (!this.pendingAcks.size) return
let c = client
if (!this.done) {
c = c ?? this.control ? this.control : this.client
}
if (this.done && !this.createdConnection) {
c = c ?? this.client
}
if (!c) throw new Error('No suitable client')
const pipeline = c.pipeline()
ack(pipeline, this)
await pipeline.exec()
}

protected async return(): Promise<void> {
await this.quit()
}
Expand Down

0 comments on commit a89c7f2

Please sign in to comment.