Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions example/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions example/src/offset_tracking_receive.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const rabbit = require("rabbitmq-stream-js-client")

const sleep = (ms) => new Promise((r) => setTimeout(r, ms))

async function main() {
console.log("Connecting...")
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
})

console.log("Making sure the stream exists...")
const streamName = "stream-offset-tracking-javascript"
await client.createStream({ stream: streamName, arguments: {} })

const consumerRef = "offset-tracking-tutorial"
let firstOffset = undefined
let offsetSpecification = rabbit.Offset.first()
try {
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName })
offsetSpecification = rabbit.Offset.offset(offset + 1n)
} catch (e) {}

let lastOffset = offsetSpecification.value
let messageCount = 0
const consumer = await client.declareConsumer(
{ stream: streamName, offset: offsetSpecification, consumerRef },
async (message) => {
messageCount++
if (!firstOffset && messageCount === 1) {
firstOffset = message.offset
console.log("First message received")
}
if (messageCount % 10 === 0) {
await consumer.storeOffset(message.offset)
}
if (message.content.toString() === "marker") {
console.log("Marker found")
lastOffset = message.offset
await consumer.storeOffset(message.offset)
await consumer.close()
}
}
)

console.log(`Start consuming...`)
await sleep(2000)
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`)
}

main()
.then(() => process.exit(0))
.catch((res) => {
console.log("Error while receiving message!", res)
process.exit(-1)
})
36 changes: 36 additions & 0 deletions example/src/offset_tracking_send.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const rabbit = require("rabbitmq-stream-js-client")

async function main() {
console.log("Connecting...")
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "localhost",
username: "rabbit",
password: "rabbit",
})

console.log("Making sure the stream exists...")
const streamName = "stream-offset-tracking-javascript"
await client.createStream({ stream: streamName, arguments: {} })

console.log("Creating the publisher...")
const publisher = await client.declarePublisher({ stream: streamName })

const messageCount = 100
console.log(`Publishing ${messageCount} messages`)
for (let i = 0; i < messageCount; i++) {
const body = i === messageCount - 1 ? "marker" : `hello ${i}`
await publisher.send(Buffer.from(body))
}

console.log("Closing the connection...")
await client.close()
}

main()
.then(() => console.log("done!"))
.catch((res) => {
console.log("Error in publishing message!", res)
process.exit(-1)
})
45 changes: 18 additions & 27 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ export class StreamConsumer implements Consumer {
public offset: Offset
private clientLocalOffset: Offset
private creditsHandler: ConsumerCreditPolicy
readonly handle: ConsumerFunc
private consumerHandle: ConsumerFunc
private closed: boolean

constructor(
handle: ConsumerFunc,
Expand All @@ -50,10 +51,12 @@ export class StreamConsumer implements Consumer {
this.clientLocalOffset = this.offset.clone()
this.connection.incrRefCount()
this.creditsHandler = params.creditPolicy || defaultCreditPolicy
this.handle = this.wrapHandle(handle, params.offset)
this.consumerHandle = handle
this.closed = false
}

async close(manuallyClose: boolean): Promise<void> {
this.closed = true
this.connection.decrRefCount()
if (ConnectionPool.removeIfUnused(this.connection)) {
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
Expand All @@ -79,31 +82,10 @@ export class StreamConsumer implements Consumer {
return this.clientLocalOffset.clone()
}

private wrapHandle(handle: ConsumerFunc, offset: Offset) {
const updateLocalOffsetHandle = this.updateLocalOffsetHandle(handle)
return this.addOffsetFilterToHandle(updateLocalOffsetHandle, offset)
}

private updateLocalOffsetHandle(handle: ConsumerFunc) {
const wrapped = (message: Message) => {
const result = handle(message)
if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset)
return result
}
return wrapped
}

private addOffsetFilterToHandle(handle: ConsumerFunc, offset: Offset) {
if (offset.type === "numeric") {
const handlerWithFilter = (message: Message) => {
if (message.offset !== undefined && message.offset < offset.value!) {
return
}
handle(message)
}
return handlerWithFilter
}
return handle
public handle(message: Message) {
if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return
this.consumerHandle(message)
this.maybeUpdateLocalOffset(message)
}

public get streamName(): string {
Expand All @@ -117,4 +99,13 @@ export class StreamConsumer implements Consumer {
public get creditPolicy() {
return this.creditsHandler
}

private maybeUpdateLocalOffset(message: Message) {
if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset)
}

// TODO -- Find better name?
private isMessageOffsetLessThanConsumers(message: Message) {
return this.offset.type === "numeric" && message.offset !== undefined && message.offset < this.offset.value!
}
}