Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When the consumer read timeout, try to read after the value of timeout #769

Merged
merged 4 commits into from
Jul 26, 2020
Merged

When the consumer read timeout, try to read after the value of timeout #769

merged 4 commits into from
Jul 26, 2020

Conversation

yunnysunny
Copy link
Contributor

The pull request is aimed to resolve the issue of #709 .

@yunnysunny yunnysunny closed this Mar 31, 2020
@yunnysunny yunnysunny reopened this Mar 31, 2020
@stale
Copy link

stale bot commented Jul 1, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

@stale stale bot added the stale Stale issues label Jul 1, 2020
@giulliano-bueno
Copy link

This PR seems to be good enough to be merged.

@stale stale bot removed the stale Stale issues label Jul 2, 2020
@iradul
Copy link
Collaborator

iradul commented Jul 20, 2020

I think default value for this timeout is 1000 so this can be a breaking change for some. It would be much better to have a separate timeout field just for handling this scenario.

@yunnysunny
Copy link
Contributor Author

Add a field of m_retry_read_ms to set the delay time before next read operation.

@@ -17,7 +17,8 @@ var KafkaConsumerStream = require('./kafka-consumer-stream');
var LibrdKafkaError = require('./error');
var TopicPartition = require('./topic-partition');
var shallowCopy = require('./util').shallowCopy;

var DEFAULT_RETRY_READ_INTERVAL = 500;
Copy link
Collaborator

@iradul iradul Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY

* Set the default value of waiting for the next time to read when current reading get none data.
* @param {number} intervalMs - number of milliseconds to sleep before the next reading
*/
KafkaConsumer.prototype.setDefaultRetryReadInterval = function(intervalMs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename method name to setDefaultConsumeLoopTimeoutDelay

Comment on lines 140 to 143
/**
* Set the default value of waiting for the next time to read when current reading get none data.
* @param {number} intervalMs - number of milliseconds to sleep before the next reading
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reword like this

/**
 * Set the default sleep delay for the next consume loop after the previous one has timed out.
 * @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out
 */

* @param {number} intervalMs - number of milliseconds to sleep before the next reading
*/
KafkaConsumer.prototype.setDefaultRetryReadInterval = function(intervalMs) {
this._retryReadInterval = Number(intervalMs) || DEFAULT_RETRY_READ_INTERVAL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename _retryReadInterval to _consumeLoopTimeoutDelay

src/workers.cc Outdated
@@ -420,10 +420,12 @@ void KafkaConsumerDisconnect::HandleErrorCallback() {

KafkaConsumerConsumeLoop::KafkaConsumerConsumeLoop(Nan::Callback *callback,
KafkaConsumer* consumer,
const int & timeout_ms) :
const int & timeout_ms,
const int & retry_interval_ms) :
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to timeout_sleep_delay_ms

src/workers.h Outdated
@@ -288,6 +288,7 @@ class KafkaConsumerConsumeLoop : public MessageWorker {
NodeKafka::KafkaConsumer * consumer;
const int m_timeout_ms;
unsigned int m_rand_seed;
const int m_retry_read_ms;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to m_timeout_sleep_delay_ms

* @param {number} intervalMs - number of milliseconds to sleep before the next reading
*/
KafkaConsumer.prototype.setDefaultRetryReadInterval = function(intervalMs) {
this._retryReadInterval = Number(intervalMs) || DEFAULT_RETRY_READ_INTERVAL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think you should not wrap interval within Number(..) because validation is done in c++ code

}

/**
* Set the default consume timeout provided to c++land
* @param {number} timeoutMs - number of milliseconds to wait for a message to be fetched
*/
KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
this._consumeTimeout = Number(timeoutMs) || DEFAULT_CONSUME_TIME_OUT;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing 0 here will not work, also you should not wrap interval within Number(..) because:
a) validation is done in c++ code
b) it's kind of breaking change because calling setDefaultConsumeTimeout('anything but number') will suddenly start working

Maybe something like this._consumeTimeout = timeoutMs === 0 ? timeoutMs : timeoutMs || DEFAULT_CONSUME_TIME_OUT; would be safer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the value of this._consumeTimeout to timeoutMs directly.

@@ -124,7 +125,8 @@ function KafkaConsumer(conf, topicConf) {
this.globalConfig = conf;
this.topicConfig = topicConf;

this._consumeTimeout = 1000;
this._consumeTimeout = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you should use DEFAULT_CONSUME_TIME_OUT instead of DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY

@@ -369,7 +379,7 @@ KafkaConsumer.prototype.unsubscribe = function() {
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, cb) {
var timeoutMs = this._consumeTimeout || 1000;
var timeoutMs = this._consumeTimeout || DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you should use DEFAULT_CONSUME_TIME_OUT instead of DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY

Comment on lines 1052 to 1054
if (!info[0]->IsNumber()) {
return Nan::ThrowError("Need to specify a timeout");
if (!info[1]->IsNumber()) {
return Nan::ThrowError("Need to specify a sleep delay");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need both info[0] and info[1]:

  if (!info[0]->IsNumber()) {
    return Nan::ThrowError("Need to specify a timeout");
  }

  if (!info[1]->IsNumber()) {
    return Nan::ThrowError("Need to specify a sleep delay");
  }

@iradul
Copy link
Collaborator

iradul commented Jul 26, 2020

Thanks!

@iradul iradul merged commit c66b741 into Blizzard:master Jul 26, 2020
@yunnysunny
Copy link
Contributor Author

I'm sorry for taking some mistakes. I wrote the code during my overtime work when my head was not clear.

@iradul
Copy link
Collaborator

iradul commented Jul 27, 2020

No worries, thanks again for PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants