From f3ad946b00a36d599c6ff625530685b81c885802 Mon Sep 17 00:00:00 2001 From: Mike Tunnicliffe Date: Wed, 4 Dec 2024 16:26:22 +0000 Subject: [PATCH] Pass stream consumer to message handler function --- src/consumer.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index 3476b61..bc69672 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -5,7 +5,7 @@ import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_pol import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" -export type ConsumerFunc = (message: Message) => Promise | void +export type ConsumerFunc = (message: Message, consumer: StreamConsumer) => Promise | void export type ConsumerUpdateListener = (consumerRef: string, streamName: string) => Promise export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => { return `${consumerId}@${connectionId}` @@ -131,7 +131,7 @@ export class StreamConsumer implements Consumer { public async handle(message: Message) { if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return - await this.consumerHandle(message) + await this.consumerHandle(message, this) this.maybeUpdateLocalOffset(message) }