Skip to content

Commit

Permalink
fix: tweak internal types
Browse files Browse the repository at this point in the history
  • Loading branch information
calebboyd committed Nov 15, 2022
1 parent 19d8580 commit ce61576
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ function xreadgroup(
if (noack) args.push('NOACK')
if (isNumber(block)) args.push('BLOCK', block.toString())
debug(`xreadgroup ${args.join(' ')}`)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
// https://github.com/luin/ioredis/pull/1676#issue-1437398115
;(client as any)[buffers ? 'xreadgroupBuffer' : 'xreadgroup'](
...args,
'STREAMS',
Expand Down
19 changes: 13 additions & 6 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import {
Mode,
modes,
env,
StreamEntry,
} from './types.js'

export { RedisStreamOptions, Mode }

type ResolvedForCaller = any

export class RedisStream<T extends Mode = 'entry'> {
//static factoryFor() { //create factory that extends options }
/**
Expand Down Expand Up @@ -50,9 +53,9 @@ export class RedisStream<T extends Mode = 'entry'> {

private itr = {
name: '',
prev: undefined as any,
entry: undefined as any,
stream: undefined as any,
prev: undefined as StreamEntry | undefined,
entry: null as IterableIterator<StreamEntry> | null,
stream: null as IterableIterator<XStreamResult> | null | undefined,
}

/**
Expand Down Expand Up @@ -147,14 +150,17 @@ export class RedisStream<T extends Mode = 'entry'> {
else {
this.streams.set(itr.name, this.group ? '>' : result.value[0].toString())
if (this.ackOnIterate) itr.prev = result.value
yield [itr.name, result.value] as any
const ret: XEntryResult = [itr.name, result.value]
yield ret as ResolvedForCaller
}
}
}

private moveCursors() {
if (this.first) {
this.streams.forEach((v, k) => this.streams.set(k, '>'))
for (const [stream] of this.streams) {
this.streams.set(stream, '>')
}
this.first = false
}
}
Expand All @@ -176,13 +182,14 @@ export class RedisStream<T extends Mode = 'entry'> {
}
}

public ack(stream: string, ...ids: string[]): void {
public ack(stream: string, ...ids: string[]): undefined {
if (!this.group) {
throw new Error('Cannot ack entries read outside of a consumer group')
}
const acks = this.pendingAcks.get(stream) || []
acks.push(...ids)
this.pendingAcks.set(stream, acks)
return
}

protected async return(): Promise<void> {
Expand Down

0 comments on commit ce61576

Please sign in to comment.