Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression issue of KafkaConsumer.partitions_for_topic since 1.4.5 #1789

Closed
m0udd opened this issue Apr 20, 2019 · 4 comments
Closed

Regression issue of KafkaConsumer.partitions_for_topic since 1.4.5 #1789

m0udd opened this issue Apr 20, 2019 · 4 comments

Comments

@m0udd
Copy link

m0udd commented Apr 20, 2019

I have this weird behavior since >=1.4.5 :
KafkaConsumer.partitions_for_topic() return None at the first call, even if the partition exist.
If we do call another method (ex: topics() ) then we call partitions_for_topic() it will work.

After few test 1.4.4 seems to work properly.

Code to reproduce the error:

from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='test',bootstrap_servers=['ip:port'])
print('{}'.format(consumer.partitions_for_topic('test-topic'))

I usually investigate further more but I'm in a rush right now :/ maybe in few days :)

@arnoldyahad
Copy link

having the same issue

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

#when using partitions_for_topic
print con.partitions_for_topic("my_topic")
None

#when using topic() and then partitions_for_topic
con.topics()
print con.partitions_for_topic("my_topic")
set([0, 1, 2])

using 1.4.6...

@gablank
Copy link

gablank commented May 14, 2019

I'm having the same issue.

Seems like partitions_for_topic does not request a metadata refresh, whereas topics does. No clue why this worked in 1.4.4, as it seems the two functions have not changed. Maybe metadata was always refreshed asap when creating the consumer in 1.4.4?

Making partitions_for_topic call the same code as topics before returning the partitions seems to solve the problem (obviously):

def partitions_for_topic(self, topic):
      """(...)"""
      cluster = self._client.cluster
      if self._client._metadata_refresh_in_progress and self._client._topics:
          future = cluster.request_update()
          self._client.poll(future=future)
      stash = cluster.need_all_topic_metadata
      cluster.need_all_topic_metadata = True
      future = cluster.request_update()
      self._client.poll(future=future)
      cluster.need_all_topic_metadata = stash
      return self._client.cluster.partitions_for_topic(topic)

@gablank
Copy link

gablank commented May 14, 2019

Seems like this PR is adressing the issue: #1781
Also, this issue seems to be a duplicate of #1774

@jeffwidman
Copy link
Collaborator

Indeed, this is a dupe of #1774

I have no had a chance to look at the fix myself yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants