Skip to content

Commit

Permalink
fix: handle ending acks on quit
Browse files Browse the repository at this point in the history
  • Loading branch information
calebboyd committed Jan 6, 2023
1 parent 895424b commit 20cacbc
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 15 deletions.
20 changes: 6 additions & 14 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ export class RedisStream {
public async quit(): Promise<void> {
if (!this.done) {
this.done = true
if (this.pendingAcks.size || this.readerId) {
const pipeline = (this.control ? this.control : this.client).pipeline()
this.pendingAcks.size && ack(pipeline, this)
this.readerId && pipeline.client('UNBLOCK', this.readerId)
await pipeline.exec()
}
if (!(this.createdConnection || this.createdControlConnection)) return
await Promise.all([
this.createdConnection && new Promise((resolve) => this.client.once('end', resolve)),
Expand Down Expand Up @@ -262,20 +268,6 @@ export class RedisStream {
await this.maybeUnblock()
}

/**
* Immediately stop processing entries
*/
public async end() {
if (this.control && this.readerId) {
const pipeline = this.control.pipeline()
ack(pipeline, this)
pipeline.client('UNBLOCK', this.readerId)
await Promise.all([pipeline.exec(), this.quit()])
} else {
await this.quit()
}
}

protected async return(): Promise<void> {
await this.quit()
}
Expand Down
2 changes: 1 addition & 1 deletion src/xread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('redis-x-stream xread', () => {
if (i === testEntries.length * 2 - 1) {
setTimeout(() => {
i++
stream.end() //break;
stream.quit() //break;
}, 100)
}
}
Expand Down

0 comments on commit 20cacbc

Please sign in to comment.