Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help: Error sending GroupCoordinatorRequest_v0 to node #739

Closed
williamsjj opened this issue Jun 24, 2016 · 18 comments
Closed

Help: Error sending GroupCoordinatorRequest_v0 to node #739

williamsjj opened this issue Jun 24, 2016 · 18 comments

Comments

@williamsjj
Copy link

Upgraded to kafka-python 1.2.2 running against a 3-node Kafka 0.10 cluster. We're receiving this error once on every subscribe action:

ERROR:kafka.coordinator:Error sending GroupCoordinatorRequest_v0 to node XXX [XXX]

The node specified appears to be random.

@dpkp
Copy link
Owner

dpkp commented Jul 8, 2016

What is under those XXXs ? In particular, the string in brackets should be the error type. What does it say?

@dpkp dpkp changed the title Error sending GroupCoordinatorRequest_v0 to node Help: Error sending GroupCoordinatorRequest_v0 to node Jul 10, 2016
@dpkp
Copy link
Owner

dpkp commented Jul 12, 2016

I can't reproduce based on the information here. Please re-open if you have more!

@dpkp dpkp closed this as completed Jul 12, 2016
@hicqu
Copy link

hicqu commented Aug 19, 2016

recently I also has same problem. I'm working with kafka 0.9.0.1 and kafka-python 1.2.1.
Here is full error message:
Error sending GroupCoordinatorRequest_v0 to node 1 [NodeNotReadyError: 1]
So could you help me?

@dpkp
Copy link
Owner

dpkp commented Aug 19, 2016

This error is logged when there is a networking failure. There isn't enough information here for me to provide any further advice. You might try enabling debug logs (logging.basicConfig(level=logging.DEBUG)) and/or checking your broker logs and configuration.

@hicqu
Copy link

hicqu commented Aug 25, 2016

@dpkp , I have resolved this problem by setting request_timeout_ms to 300000 insttead of 40000 by default. So I think the reason is I fetch too many messages from kafka broker one time, and after I processed all of them, call consumer.poll() again, it's beyond request_timeout_ms. What do you think? If you don't agree me or you can not certain, I will attach more information here to help you analyze.

@hicqu
Copy link

hicqu commented Aug 25, 2016

@dpkp , same error was found again. So I print kafka log at DEBUG level. I attach the log file below. Please check that and help me.
kafka.zip

@thrbowl
Copy link

thrbowl commented Oct 26, 2016

+1, i got the same error.

@fillest
Copy link

fillest commented Nov 15, 2016

I don't see how I can re-open this issue, I don't see anything that could be related to "re-open" in the github UI.

I'm getting this error on localhost with a single broker and zookeeper on a fresh topic populated with some messages, it's clearly not a networking issue.
kafka-python==1.3.1, kafka 0.10.1.0
This is how i configure the consumer:

topic_partition = kafka.TopicPartition('test', 0)
consumer = kafka.KafkaConsumer(
    bootstrap_servers = ['localhost'],
    client_id = 'kafka_cat',
    value_deserializer = cPickle.loads,
    auto_offset_reset = 'earliest',
    enable_auto_commit = False,
    consumer_timeout_ms = 1000 * 3,
    api_version = (0, 10),
)
with contextlib.closing(consumer):
    consumer.assign([topic_partition])
    consumer.seek_to_beginning()
    logger.info("committed offset %s, next record offset to be be fetched %s", consumer.committed(topic_partition), consumer.position(topic_partition))
    for record in consumer:
        print record

Here is a fragment of debug log around the error message:

DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency
DEBUG:kafka.metrics.metrics:Added sensor with name records-lag
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name heartbeat-latency
DEBUG:kafka.metrics.metrics:Added sensor with name join-latency
DEBUG:kafka.metrics.metrics:Added sensor with name sync-latency
DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency
DEBUG:kafka.consumer.group:Seeking to beginning of partition TopicPartition(topic='test', partition=0)
DEBUG:kafka.coordinator:Sending group coordinator request for group kafka-python-default-group to broker 0
DEBUG:kafka.client:Initiating connection to node 0 at 127.0.0.1:9092
DEBUG:kafka.metrics.metrics:Added sensor with name node-0.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-0.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-0.latency
DEBUG:kafka.conn:<BrokerConnection host=127.0.0.1/127.0.0.1 port=9092>: creating new socket
ERROR:kafka.coordinator:Error sending GroupCoordinatorRequest_v0 to node 0 [NodeNotReadyError: 0]
DEBUG:kafka.conn:<BrokerConnection host=127.0.0.1/127.0.0.1 port=9092>: established TCP connection
DEBUG:kafka.client:Node 0 connected
DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=['test']) to node 0
DEBUG:kafka.conn:<BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Request 1: MetadataRequest_v1(topics=['test'])
DEBUG:kafka.conn:<BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Response 1: MetadataResponse_v1(brokers=[(node_id=0, host=u'127.0.0.1', port=9092, rack=None)], controller_id=0, topics=[(error_code=0, topic=u'test', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:kafka.coordinator:Sending group coordinator request for group kafka-python-default-group to broker 0
DEBUG:kafka.conn:<BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Request 2: GroupCoordinatorRequest_v0(consumer_group='kafka-python-default-group')
DEBUG:kafka.conn:<BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Response 2: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=0, host=u'127.0.0.1', port=9092)
DEBUG:kafka.coordinator:Received group coordinator response GroupCoordinatorResponse_v0(error_code=0, coordinator_id=0, host=u'127.0.0.1', port=9092)

@dpkp dpkp reopened this Nov 15, 2016
@fillest
Copy link

fillest commented Nov 18, 2016

I've just tried to use group coordination without additional calls with the same result - reduced the code to just passing a topic to KafkaConsumer and then just iterating on it. So the code is basically the following (env is the same, kafka-python==1.3.1, a single localhost-only kafka 0.10.1.0 broker):

consumer = kafka.KafkaConsumer(
    'test',
    bootstrap_servers = ['127.0.0.1'],
    client_id = 'kafka_cat',
    group_id = 'kafka_cat',
    value_deserializer = cPickle.loads,
    auto_offset_reset = 'earliest',
    enable_auto_commit = False,
    consumer_timeout_ms = 1000 * 2,
    api_version = (0, 10),
)
with contextlib.closing(consumer):
    for record in consumer:
        print record

The head of debug log (the error wrapped with newlines):

2016-11-18 17:33:38,376 DEBUG kafka.metrics.metrics MainThread  Added sensor with name connections-closed
2016-11-18 17:33:38,376 DEBUG kafka.metrics.metrics MainThread  Added sensor with name connections-created
2016-11-18 17:33:38,376 DEBUG kafka.metrics.metrics MainThread  Added sensor with name select-time
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name io-time
2016-11-18 17:33:38,377 DEBUG kafka.client MainThread  Attempting to bootstrap via node at 127.0.0.1:9092
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name bytes-sent-received
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name bytes-sent
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name bytes-received
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name request-latency
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name node-bootstrap.bytes-sent
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name node-bootstrap.bytes-received
2016-11-18 17:33:38,377 DEBUG kafka.metrics.metrics MainThread  Added sensor with name node-bootstrap.latency
2016-11-18 17:33:38,377 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092>: creating new socket
2016-11-18 17:33:38,378 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092>: established TCP connection
2016-11-18 17:33:38,378 DEBUG kafka.client MainThread  Node bootstrap connected
2016-11-18 17:33:38,378 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Request 1: MetadataRequest_v1(topics=NULL)
2016-11-18 17:33:38,379 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Response 1: MetadataResponse_v1(brokers=[(node_id=0, host=u'127.0.0.1', port=9092, rack=None)], controller_id=0, topics=[... cut ...])
2016-11-18 17:33:38,379 DEBUG kafka.cluster MainThread  Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name bytes-fetched
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name records-fetched
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name fetch-latency
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name records-lag
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name fetch-throttle-time
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name heartbeat-latency
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name join-latency
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name sync-latency
2016-11-18 17:33:38,380 DEBUG kafka.metrics.metrics MainThread  Added sensor with name commit-latency
2016-11-18 17:33:38,380 INFO  kafka.consumer.subscription_state MainThread  Updating subscribed topics to: ('test',)
2016-11-18 17:33:38,380 DEBUG kafka.coordinator MainThread  Sending group coordinator request for group kafka_cat to broker 0
2016-11-18 17:33:38,380 DEBUG kafka.client MainThread  Initiating connection to node 0 at 127.0.0.1:9092
2016-11-18 17:33:38,381 DEBUG kafka.metrics.metrics MainThread  Added sensor with name node-0.bytes-sent
2016-11-18 17:33:38,381 DEBUG kafka.metrics.metrics MainThread  Added sensor with name node-0.bytes-received
2016-11-18 17:33:38,381 DEBUG kafka.metrics.metrics MainThread  Added sensor with name node-0.latency
2016-11-18 17:33:38,381 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092>: creating new socket

2016-11-18 17:33:38,386 ERROR kafka.coordinator MainThread  Error sending GroupCoordinatorRequest_v0 to node 0 [NodeNotReadyError: 0]

2016-11-18 17:33:38,386 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092>: established TCP connection
2016-11-18 17:33:38,386 DEBUG kafka.client MainThread  Node 0 connected
2016-11-18 17:33:38,480 DEBUG kafka.client MainThread  Sending metadata request MetadataRequest_v1(topics=['test']) to node 0
2016-11-18 17:33:38,481 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Request 1: MetadataRequest_v1(topics=['test'])
2016-11-18 17:33:38,482 DEBUG kafka.conn MainThread  <BrokerConnection host=127.0.0.1/127.0.0.1 port=9092> Response 1: MetadataResponse_v1(brokers=[(node_id=0, host=u'127.0.0.1', port=9092, rack=None)], controller_id=0, topics=[(error_code=0, topic=u'test', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
2016-11-18 17:33:38,482 DEBUG kafka.cluster MainThread  Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2016-11-18 17:33:38,482 DEBUG kafka.coordinator MainThread  Sending group coordinator request for group kafka_cat to broker 0

@harelba
Copy link
Contributor

harelba commented Feb 7, 2017

happens to me as well, every time after I change the consumer-group id to a new one. E.g. works well with consumer group id N, then I decide to change to consumer group N+1 (that doesn't exist yet), then consumer fails as follows.

5564, 2017-02-07 14:02:26,706 - WARNING  <BrokerConnection host=localhost/::1 port=9092> timed out after 40000 ms. Closing connection.
5564, 2017-02-07 14:02:26,707 - WARNING  Node 0 connection failed -- refreshing metadata
5564, 2017-02-07 14:02:26,707 - ERROR    Error sending JoinGroupRequest_v0 to node 0 [[Error 7] RequestTimedOutError: Request timed out after 40000 ms]
5564, 2017-02-07 14:02:26,708 - WARNING  Marking the coordinator dead (node 0) for group assembled-studies-study-processor_5: [Error 7] RequestTimedOutError: Request timed out after 40000 ms.
5564, 2017-02-07 14:02:26,813 - ERROR    Error sending GroupCoordinatorRequest_v0 to node 0 [NodeNotReadyError: 0]
5564, 2017-02-07 14:02:26,817 - INFO     Group coordinator for XXXX_5 is BrokerMetadata(nodeId=0, host=u'localhost', port=9092, rack=None)
5564, 2017-02-07 14:02:26,818 - INFO     Discovered coordinator 0 for group XXXX_5
5564, 2017-02-07 14:02:26,819 - INFO     (Re-)joining group XXXX_5

One kafka node only, on local host, kafka version kafka_2.11-0.10.1.0

Consumer is also on localhost, auto_offset_reset=latest, max_poll_records=1, session_timeout_ms = 60000.

@harelba
Copy link
Contributor

harelba commented Feb 7, 2017

have a full debug-level reproduction of this, attached. Renamed the topic and group-id names in a consistent way, to hide some business details. the relevant group id topic-as-sp-group_5. While I was using topic-as-sp-group_4, all was well for a while, and when moving to topic-as-sp-group_5, this issue started to reproduce. When we moved from topic-as-sp-group_3 to topic-as-sp-group_4, we had the same issue, but at some point it subsided.

btw, this happens on multiple machines with separate kafka installations. Kafka server is with default configuration, no changes (these are all mac dev machines).

timeout-debug.log.gz

@dpkp
Copy link
Owner

dpkp commented Feb 7, 2017

Thanks -- I'll take a look and see if this uncovers anything.

But, FYI, changing consumer groups is not supported. You should always create a new consumer instance if you want to use a different group.

@harelba
Copy link
Contributor

harelba commented Feb 7, 2017

Thanks, that would be awesome.

I'll explain what I meant regarding "changing" the consumer group - I wasn't referring to actually changing it in runtime, I was referring to restarting the process with a new configuration that makes the consumer connect to a new group id.

The logs I've attached are just the debug logs that have been written while the process started and created a consumer as part of its startup.

@dpkp
Copy link
Owner

dpkp commented Feb 7, 2017

right on. can you provide a short code snippet that might help reproduce?

@harelba
Copy link
Contributor

harelba commented Feb 8, 2017

i will try to arrange something that reproduces this.

btw, I've found this kafka jira that was open, which has exactly the same problem i'm seeing (the one opening the issue also was using kafka-python) - https://issues.apache.org/jira/browse/KAFKA-4086

In my case, it's totally unrelated to long processing, though. Actually, the message rate is very low, in most cases the consumer starts up and there are no new messages at all, and the problem still happens.

@harelba
Copy link
Contributor

harelba commented Feb 13, 2017

Found the root cause of the problem. The reason for the issue was that the session timeout was larger than the request timeout. Whenever we would restart a process, the previous process' session would live for too long relative to the request timeout, leading to the request timeout error.

When looking at the standard java based kafka client, there is actually an exception being thrown when the session timeout is larger than the request timeout, or when the fetch-max-wait is larger than the request timeout. See code Here.

I've created a pull request that throws an error if these constraints are violated #986 . Ran the tests manually through pytest tough, since tox stuff wasn't working for me. Send me any comments on it if needed.

Thanks
Harel

@ilaif
Copy link

ilaif commented Feb 15, 2017

Joining @harelba - It happened to me and was a nightmare to solve until I understood the problem, there is no sense in having session_timeout_ms > request_timeout_ms by definition.

@dpkp
Copy link
Owner

dpkp commented Feb 28, 2017

merged harelba's PR to fail fast if these values are mis-configured. Thanks for the debugging!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants