From f3dd693a953520d46b574ee0d88030526dfa93fe Mon Sep 17 00:00:00 2001 From: magne Date: Thu, 22 Aug 2024 17:18:48 +0200 Subject: [PATCH 1/2] feat: ignore local offset update if consumer closed --- example/package-lock.json | 40 ++++++++--------- example/src/offset_tracking_receive.js | 59 ++++++++++++++++++++++++++ example/src/offset_tracking_send.js | 36 ++++++++++++++++ src/consumer.ts | 12 +++++- 4 files changed, 126 insertions(+), 21 deletions(-) create mode 100644 example/src/offset_tracking_receive.js create mode 100644 example/src/offset_tracking_send.js diff --git a/example/package-lock.json b/example/package-lock.json index fb3437cb..312360e2 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -16,43 +16,43 @@ "typescript": "^4.9.5" }, "engines": { - "node": "16.x.x" + "node": "20.x.x" } }, " ..": { "extraneous": true }, "..": { - "version": "0.3.1", + "version": "0.4.1", "license": "ISC", "dependencies": { "semver": "^7.5.4" }, "devDependencies": { - "@tsconfig/node16": "^1.0.3", + "@tsconfig/node-lts": "^20.1.1", "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", "@types/chai-as-promised": "^7.1.8", "@types/chai-spies": "^1.0.6", "@types/mocha": "^10.0.1", - "@types/node": "^16.18.11", - "@typescript-eslint/eslint-plugin": "^5.50.0", - "@typescript-eslint/parser": "^5.50.0", + "@types/node": "^20.11.5", + "@typescript-eslint/eslint-plugin": "^6.19.0", + "@typescript-eslint/parser": "^6.19.0", "amqplib": "^0.10.3", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", "chai-spies": "^1.1.0", - "cspell": "^6.21.0", + "cspell": "^7.3.9", "eslint": "^8.33.0", - "eslint-config-prettier": "^8.6.0", - "eslint-plugin-deprecation": "^1.3.3", + "eslint-config-prettier": "^9.1.0", + "eslint-plugin-deprecation": "^2.0.0", "eslint-plugin-import": "^2.27.5", "eslint-plugin-no-only-tests": "^3.1.0", - "eslint-plugin-prettier": "^4.2.1", + "eslint-plugin-prettier": "^5.1.3", "got": "^11.8.5", "mocha": "^10.2.0", "ts-node": "^10.9.1", - "typescript": "^4.9.5", + "typescript": "^5.3.3", "winston": "^3.8.2" } }, @@ -245,31 +245,31 @@ "rabbitmq-stream-js-client": { "version": "file:..", "requires": { - "@tsconfig/node16": "^1.0.3", + "@tsconfig/node-lts": "^20.1.1", "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", "@types/chai-as-promised": "^7.1.8", "@types/chai-spies": "^1.0.6", "@types/mocha": "^10.0.1", - "@types/node": "^16.18.11", - "@typescript-eslint/eslint-plugin": "^5.50.0", - "@typescript-eslint/parser": "^5.50.0", + "@types/node": "^20.11.5", + "@typescript-eslint/eslint-plugin": "^6.19.0", + "@typescript-eslint/parser": "^6.19.0", "amqplib": "^0.10.3", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", "chai-spies": "^1.1.0", - "cspell": "^6.21.0", + "cspell": "^7.3.9", "eslint": "^8.33.0", - "eslint-config-prettier": "^8.6.0", - "eslint-plugin-deprecation": "^1.3.3", + "eslint-config-prettier": "^9.1.0", + "eslint-plugin-deprecation": "^2.0.0", "eslint-plugin-import": "^2.27.5", "eslint-plugin-no-only-tests": "^3.1.0", - "eslint-plugin-prettier": "^4.2.1", + "eslint-plugin-prettier": "^5.1.3", "got": "^11.8.5", "mocha": "^10.2.0", "semver": "^7.5.4", "ts-node": "^10.9.1", - "typescript": "^4.9.5", + "typescript": "^5.3.3", "winston": "^3.8.2" } }, diff --git a/example/src/offset_tracking_receive.js b/example/src/offset_tracking_receive.js new file mode 100644 index 00000000..faddabd4 --- /dev/null +++ b/example/src/offset_tracking_receive.js @@ -0,0 +1,59 @@ +const rabbit = require("rabbitmq-stream-js-client") + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) + +async function main() { + console.log("Connecting...") + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", + }) + + console.log("Making sure the stream exists...") + const streamName = "stream-offset-tracking-javascript" + await client.createStream({ stream: streamName, arguments: {} }) + + const consumerRef = "offset-tracking-tutorial" + let firstOffset = undefined + let offsetSpecification = rabbit.Offset.first() + try { + const offset = await client.queryOffset({ reference: consumerRef, stream: streamName }) + offsetSpecification = rabbit.Offset.offset(offset + 1n) + } catch (e) {} + + let lastOffset = offsetSpecification.value + let messageCount = 0 + const consumer = await client.declareConsumer( + { stream: streamName, offset: offsetSpecification, consumerRef }, + async (message) => { + messageCount++ + if (!firstOffset && messageCount === 1) { + firstOffset = message.offset + console.log("First message received") + } + if (messageCount % 10 === 0) { + await consumer.storeOffset(message.offset) + } + if (message.content.toString() === "marker") { + console.log("Marker found") + lastOffset = message.offset + await consumer.storeOffset(message.offset) + await consumer.close() + } + } + ) + + console.log(`Start consuming...`) + await sleep(2000) + console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`) +} + +main() + .then(() => process.exit(0)) + .catch((res) => { + console.log("Error while receiving message!", res) + process.exit(-1) + }) diff --git a/example/src/offset_tracking_send.js b/example/src/offset_tracking_send.js new file mode 100644 index 00000000..a50f070c --- /dev/null +++ b/example/src/offset_tracking_send.js @@ -0,0 +1,36 @@ +const rabbit = require("rabbitmq-stream-js-client") + +async function main() { + console.log("Connecting...") + const client = await rabbit.connect({ + vhost: "/", + port: 5552, + hostname: "localhost", + username: "rabbit", + password: "rabbit", + }) + + console.log("Making sure the stream exists...") + const streamName = "stream-offset-tracking-javascript" + await client.createStream({ stream: streamName, arguments: {} }) + + console.log("Creating the publisher...") + const publisher = await client.declarePublisher({ stream: streamName }) + + const messageCount = 100 + console.log(`Publishing ${messageCount} messages`) + for (let i = 0; i < messageCount; i++) { + const body = i === messageCount - 1 ? "marker" : `hello ${i}` + await publisher.send(Buffer.from(body)) + } + + console.log("Closing the connection...") + await client.close() +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("Error in publishing message!", res) + process.exit(-1) + }) diff --git a/src/consumer.ts b/src/consumer.ts index d2eb0e1e..b2343fd4 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -29,6 +29,7 @@ export class StreamConsumer implements Consumer { private clientLocalOffset: Offset private creditsHandler: ConsumerCreditPolicy readonly handle: ConsumerFunc + private closed: boolean constructor( handle: ConsumerFunc, @@ -51,9 +52,11 @@ export class StreamConsumer implements Consumer { this.connection.incrRefCount() this.creditsHandler = params.creditPolicy || defaultCreditPolicy this.handle = this.wrapHandle(handle, params.offset) + this.closed = false } async close(manuallyClose: boolean): Promise { + this.closed = true this.connection.decrRefCount() if (ConnectionPool.removeIfUnused(this.connection)) { await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) @@ -80,10 +83,17 @@ export class StreamConsumer implements Consumer { } private wrapHandle(handle: ConsumerFunc, offset: Offset) { - const updateLocalOffsetHandle = this.updateLocalOffsetHandle(handle) + const updateLocalOffsetHandle = this.updateLocalOffsetHandle(this.skipMessageIfClosed(handle)) return this.addOffsetFilterToHandle(updateLocalOffsetHandle, offset) } + private skipMessageIfClosed(handle: ConsumerFunc) { + return (message: Message) => { + if (this.closed) return + return handle(message) + } + } + private updateLocalOffsetHandle(handle: ConsumerFunc) { const wrapped = (message: Message) => { const result = handle(message) From a56b38f7f0d48b96e4a5159f7956c6c358af040b Mon Sep 17 00:00:00 2001 From: magne Date: Fri, 23 Aug 2024 15:46:17 +0200 Subject: [PATCH 2/2] chore: refactor message handling in consumer --- src/consumer.ts | 49 +++++++++++++++---------------------------------- 1 file changed, 15 insertions(+), 34 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index b2343fd4..2385aa1b 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -28,7 +28,7 @@ export class StreamConsumer implements Consumer { public offset: Offset private clientLocalOffset: Offset private creditsHandler: ConsumerCreditPolicy - readonly handle: ConsumerFunc + private consumerHandle: ConsumerFunc private closed: boolean constructor( @@ -51,7 +51,7 @@ export class StreamConsumer implements Consumer { this.clientLocalOffset = this.offset.clone() this.connection.incrRefCount() this.creditsHandler = params.creditPolicy || defaultCreditPolicy - this.handle = this.wrapHandle(handle, params.offset) + this.consumerHandle = handle this.closed = false } @@ -82,38 +82,10 @@ export class StreamConsumer implements Consumer { return this.clientLocalOffset.clone() } - private wrapHandle(handle: ConsumerFunc, offset: Offset) { - const updateLocalOffsetHandle = this.updateLocalOffsetHandle(this.skipMessageIfClosed(handle)) - return this.addOffsetFilterToHandle(updateLocalOffsetHandle, offset) - } - - private skipMessageIfClosed(handle: ConsumerFunc) { - return (message: Message) => { - if (this.closed) return - return handle(message) - } - } - - private updateLocalOffsetHandle(handle: ConsumerFunc) { - const wrapped = (message: Message) => { - const result = handle(message) - if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset) - return result - } - return wrapped - } - - private addOffsetFilterToHandle(handle: ConsumerFunc, offset: Offset) { - if (offset.type === "numeric") { - const handlerWithFilter = (message: Message) => { - if (message.offset !== undefined && message.offset < offset.value!) { - return - } - handle(message) - } - return handlerWithFilter - } - return handle + public handle(message: Message) { + if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return + this.consumerHandle(message) + this.maybeUpdateLocalOffset(message) } public get streamName(): string { @@ -127,4 +99,13 @@ export class StreamConsumer implements Consumer { public get creditPolicy() { return this.creditsHandler } + + private maybeUpdateLocalOffset(message: Message) { + if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset) + } + + // TODO -- Find better name? + private isMessageOffsetLessThanConsumers(message: Message) { + return this.offset.type === "numeric" && message.offset !== undefined && message.offset < this.offset.value! + } }