diff --git a/src/consumer.ts b/src/consumer.ts index 3476b61..bc69672 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -5,7 +5,7 @@ import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_pol import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" -export type ConsumerFunc = (message: Message) => Promise | void +export type ConsumerFunc = (message: Message, consumer: StreamConsumer) => Promise | void export type ConsumerUpdateListener = (consumerRef: string, streamName: string) => Promise export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => { return `${consumerId}@${connectionId}` @@ -131,7 +131,7 @@ export class StreamConsumer implements Consumer { public async handle(message: Message) { if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return - await this.consumerHandle(message) + await this.consumerHandle(message, this) this.maybeUpdateLocalOffset(message) }