Skip to content

Ensure that all raced promises are resolved after the race #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@

# confluent-kafka-javascript v0.5.0

v0.5.0 is a limited availability feature release. It is supported for all usage.

## Enhancements

1. Add support for an Admin API to delete records.(#141).
2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).


# confluent-kafka-javascript v0.4.0
Expand Down
5 changes: 5 additions & 0 deletions lib/kafkajs/_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,11 @@ class Timer {
* or the passed promise resolves, when it's passed, clearing the timeout
* in any case.
*
* WARNING: it must be avoided to call `withTimeout` with the same promise
* more than once, as `Promise.race` will add more callbacks to it,
* creating a memory leak if the promise is never resolved or not resolved
* soon enough.
*
* @param {number} timeoutMs The timeout in milliseconds.
* @param {Promise|undefined} promise The promise to wait for,
* alternatively to the timeout, or `undefined` to just wait for the timeout.
Expand Down
37 changes: 21 additions & 16 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
const { Buffer } = require('buffer');
const MessageCache = require('./_consumer_cache');
const { hrtime } = require('process');
const { LinkedList } = require('./_linked-list');

const ConsumerState = Object.freeze({
INIT: 0,
Expand Down Expand Up @@ -170,9 +171,9 @@ class Consumer {
#fetchInProgress;

/**
* Promise that resolves when there is something we need to poll for (messages, rebalance, etc).
* List of DeferredPromises waiting on consumer queue to be non-empty.
*/
#queueNonEmpty = new DeferredPromise();
#queueWaiters = new LinkedList();

/**
* Whether any rebalance callback is in progress.
Expand Down Expand Up @@ -1270,8 +1271,9 @@ class Consumer {
}

#queueNonEmptyCb() {
/* Unconditionally resolve the promise - not a problem if it's already resolved. */
this.#queueNonEmpty.resolve();
for (const waiter of this.#queueWaiters) {
waiter.resolve();
}
}

async #nextFetchRetry() {
Expand All @@ -1280,15 +1282,21 @@ class Consumer {
} else {
/* Backoff a little. If m is null, we might be without messages
* or in available partition starvation, and calling consumeSingleCached
* in a tight loop will help no one. We still keep it to 1000ms because we
* want to keep polling, though (ideally) we could increase it all the way
* up to max.poll.interval.ms.
* in a tight loop will help no one.
* In case there is any message in the queue, we'll be woken up before the
* timer expires. */
await Timer.withTimeout(1000, this.#queueNonEmpty);
if (this.#queueNonEmpty.resolved) {
this.#queueNonEmpty = new DeferredPromise();
}
* timer expires.
* We have a per-worker promise, otherwise we end up awakening
* other workers when they've already looped and just restarted awaiting.
* The `Promise` passed to `Timer.withTimeout` cannot be reused
* in next call to this method, to avoid memory leaks caused
* by `Promise.race`. */
const waiter = new DeferredPromise();
const waiterNode = this.#queueWaiters.addLast(waiter);
await Timer.withTimeout(1000, waiter);

/* Resolves the "extra" promise that has been spawned when creating the timer. */
waiter.resolve();
this.#queueWaiters.remove(waiterNode);
}
}

Expand Down Expand Up @@ -1374,10 +1382,7 @@ class Consumer {
let interval = Number(cacheExpiration - now) / 1e6;
if (interval < 100)
interval = 100;
const promises = Promise.race([this.#workerTerminationScheduled,
this.#maxPollIntervalRestart]);
await Timer.withTimeout(interval,
promises);
await Timer.withTimeout(interval, this.#maxPollIntervalRestart);
if (this.#maxPollIntervalRestart.resolved)
this.#maxPollIntervalRestart = new DeferredPromise();
}
Expand Down
4 changes: 3 additions & 1 deletion test/promisified/consumer/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,10 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit

let calls = 0;
let failedSeek = false;
let eachMessageStarted = false;
consumer.run({
eachMessage: async ({ message }) => {
eachMessageStarted = true;
/* Take a long time to process the message. */
await sleep(7000);
try {
Expand All @@ -540,7 +542,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit

/* Waiting for assignment and then a bit more means that the first eachMessage starts running. */
await waitFor(() => consumer.assignment().length > 0, () => { }, { delay: 50 });
await sleep(200);
await waitFor(() => eachMessageStarted, () => { }, { delay: 50 });
await consumer.disconnect();

/* Even without explicitly waiting for it, a pending call to eachMessage must complete before disconnect does. */
Expand Down