Skip to content

Commit

Permalink
feat(rdkafka): delay correction in consumer-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
llevkin committed Aug 24, 2023
1 parent 333550d commit 6dd8d34
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions packages/@biorate/rdkafka/src/connections/consumer-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ export class RDKafkaConsumerStreamConnection
#handle = async () => {
let messages: Message[] = [];
let time: () => number;
let diff = 0;
const latest = new Map<number, Message>();
const counter = new Map<string, number>();
const tasks = [];
while (true) {
try {
await timer.wait(this.delay);
await timer.wait(Math.max(this.delay - diff, 0));
if (!this.started) continue;
if (!this.pool.length) continue;
time = timeDiff();
Expand All @@ -113,7 +114,8 @@ export class RDKafkaConsumerStreamConnection
this.stream.consumer.commitMessage(message);
this.emit(EventsConsumerStream.LatestMessage, message);
}
this.#setMetrics(counter, 200, time());
diff = time();
this.#setMetrics(counter, 200, diff);
} catch (e) {
counter.clear();
for (const message of messages)
Expand Down

0 comments on commit 6dd8d34

Please sign in to comment.