Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ def request_offset_limits(self, partition_requests):
def request_metadata(self, topics=None):
"""Request cluster metadata

:param topics: The topic ids for which to request metadata
:type topics: Iterable of int
:param topics: The topic names for which to request metadata
:type topics: Iterable of `bytes`
"""
max_retries = 3
for i in range(max_retries):
Expand Down
134 changes: 76 additions & 58 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import weakref

from .broker import Broker
from .exceptions import (ConsumerCoordinatorNotAvailable,
KafkaException,
from .exceptions import (ERROR_CODES,
ConsumerCoordinatorNotAvailable,
UnknownTopicOrPartition,
LeaderNotAvailable)
from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse
Expand All @@ -36,18 +36,31 @@


class TopicDict(dict):
"""Dictionary which will attempt to auto-create unknown topics."""
"""Lazy dict, which will also attempt to auto-create unknown topics"""

def __init__(self, cluster, *args, **kwargs):
def __init__(self, cluster, exclude_internal_topics, *args, **kwargs):
super(TopicDict, self).__init__(*args, **kwargs)
self._cluster = weakref.proxy(cluster)
self._exclude_internal_topics = exclude_internal_topics

def __getitem__(self, key):
topic_ref = super(TopicDict, self).__getitem__(key)
if topic_ref is not None and topic_ref() is not None:
return topic_ref()
else:
# Topic exists, but needs to be instantiated locally
meta = self._cluster._get_metadata([key])
topic = Topic(self._cluster, meta.topics[key])
self[key] = weakref.ref(topic)
return topic

def __missing__(self, key):
log.warning('Topic %s not found. Attempting to auto-create.', key)
if self._create_topic(key):
return self[key]
else:
raise UnknownTopicOrPartition('Unknown topic: {topic}'.format(topic=key))
self._create_topic(key)

# Note that __missing__ is called from within dict.__getitem__, so
# that's what we should be returning (rather than self.__getitem__)
return super(TopicDict, self).__getitem__(key)

def _create_topic(self, topic_name):
"""Auto-create a topic.
Expand All @@ -57,21 +70,58 @@ def _create_topic(self, topic_name):
with settings and everything, we'll implement that. To expose just
this now would be disingenuous, since it's features would be hobbled.
"""
if len(self._cluster.brokers) == 0:
log.warning("No brokers found. This is probably because of "
"KAFKA-2154, which will be fixed in Kafka 0.8.3")
raise KafkaException("Unable to retrieve metdata. Can't auto-create topic. See log for details.")
# Auto-creating will take a moment, so we try 5 times.
for i in range(5):
while True:
# Auto-creating is as simple as issuing a metadata request
# solely for that topic. The update is just to be sure
# our `Cluster` knows about it.
self._cluster.brokers[list(self._cluster.brokers.keys())[0]].request_metadata(topics=[topic_name])
self._cluster.update()
if topic_name in self:
# solely for that topic. If topic auto-creation is enabled on the
# broker, the initial response will carry a LeaderNotAvailable
# error, otherwise it will be an UnknownTopicOrPartition or
# possibly a RequestTimedOut
res = self._cluster._get_metadata(topics=[topic_name])
err = res.topics[topic_name].err
if err == LeaderNotAvailable.ERROR_CODE:
time.sleep(.1)
elif err == 0:
log.info('Topic %s successfully auto-created.', topic_name)
return True
time.sleep(0.1)
self._cluster.update()
break
else:
raise ERROR_CODES[err](
"Failed to auto-create topic '{}'".format(topic_name))

def _update_topics(self, metadata):
"""Update topics with fresh metadata.

:param metadata: Metadata for all topics.
:type metadata: Dict of `{name, metadata}` where `metadata` is
:class:`pykafka.protocol.TopicMetadata` and `name` is `bytes`.
"""
# Remove old topics
removed = set(self.keys()) - set(metadata.keys())
if len(removed) > 0:
log.info("Removing %d topics", len(removed))
for name in removed:
log.debug("Removing topic '%s'", name)
super(TopicDict, self).pop(name)

# Add/update partition information
if len(metadata) > 0:
log.info("Discovered %d topics", len(metadata))
for name, meta in iteritems(metadata):
if not self._should_exclude_topic(name):
if name not in self.keys():
self[name] = None # to be instantiated lazily
log.debug("Discovered topic '%s'", name)
else:
# avoid instantiating Topic if it isn't already there
ref = super(TopicDict, self).__getitem__(name)
if ref is not None and ref() is not None:
self[name].update(meta)

def _should_exclude_topic(self, topic_name):
"""Should this topic be excluded from the list shown to the client?"""
if not self._exclude_internal_topics:
return False
return topic_name.startswith(b"__")


class Cluster(object):
Expand Down Expand Up @@ -110,8 +160,7 @@ def __init__(self,
self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms
self._handler = handler
self._brokers = {}
self._topics = TopicDict(self)
self._exclude_internal_topics = exclude_internal_topics
self._topics = TopicDict(self, exclude_internal_topics)
self._source_address = source_address
self._source_host = self._source_address.split(':')[0]
self._source_port = 0
Expand Down Expand Up @@ -142,13 +191,13 @@ def handler(self):
"""The concurrency handler for network requests"""
return self._handler

def _get_metadata(self):
def _get_metadata(self, topics=None):
"""Get fresh cluster metadata from a broker."""
# Works either on existing brokers or seed_hosts list
brokers = [b for b in self.brokers.values() if b.connected]
if brokers:
for broker in brokers:
response = broker.request_metadata()
response = broker.request_metadata(topics)
if response is not None:
return response
else: # try seed hosts
Expand All @@ -162,7 +211,7 @@ def _get_metadata(self):
buffer_size=1024 * 1024,
source_host=self._source_host,
source_port=self._source_port)
response = broker.request_metadata()
response = broker.request_metadata(topics)
if response is not None:
return response
except Exception as e:
Expand Down Expand Up @@ -209,37 +258,6 @@ def _update_brokers(self, broker_metadata):
# needed.
raise Exception('Broker host/port change detected! %s', broker)

def _update_topics(self, metadata):
"""Update topics with fresh metadata.

:param metadata: Metadata for all topics.
:type metadata: Dict of `{name, metadata}` where `metadata` is
:class:`pykafka.protocol.TopicMetadata` and `name` is str.
"""
# Remove old topics
removed = set(self._topics.keys()) - set(metadata.keys())
if len(removed) > 0:
log.info("Removing %d topics", len(removed))
for name in removed:
log.debug('Removing topic %s', self._topics[name])
self._topics.pop(name)
# Add/update partition information
if len(metadata) > 0:
log.info("Discovered %d topics", len(metadata))
for name, meta in iteritems(metadata):
if not self._should_exclude_topic(name):
if name not in self._topics:
self._topics[name] = Topic(self, meta)
log.debug('Discovered topic %s', self._topics[name])
else:
self._topics[name].update(meta)

def _should_exclude_topic(self, topic_name):
"""Should this topic be excluded from the list shown to the client?"""
if not self._exclude_internal_topics:
return False
return topic_name.startswith(b"__")

def get_offset_manager(self, consumer_group):
"""Get the broker designated as the offset manager for this consumer group.

Expand Down Expand Up @@ -291,7 +309,7 @@ def update(self):
'manually using the Kafka CLI tools.')
self._update_brokers(metadata.brokers)
try:
self._update_topics(metadata.topics)
self._topics._update_topics(metadata.topics)
except LeaderNotAvailable:
log.warning("LeaderNotAvailable encountered. This is "
"because one or more partitions have no available replicas.")
Expand Down
47 changes: 47 additions & 0 deletions tests/pykafka/test_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import unittest
from uuid import uuid4

from pykafka import KafkaClient, Topic
from pykafka.test.utils import get_cluster, stop_cluster


class ClusterIntegrationTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.kafka = get_cluster()
cls.client = KafkaClient(cls.kafka.brokers)

@classmethod
def tearDownClass(cls):
stop_cluster(cls.kafka)

def test_topic_autocreate(self):
topic_name = uuid4().hex.encode()
topic = self.client.topics[topic_name]
self.assertTrue(isinstance(topic, Topic))

def test_topic_updates(self):
startlen = len(self.client.topics)
name_a = uuid4().hex.encode()
name_b = uuid4().hex.encode()

self.client.topics[name_a]
self.client.topics[name_a] # 2nd time shouldn't affect len(topics)
self.client.topics[name_b]
self.assertEqual(len(self.client.topics), startlen + 2)
self.assertIn(name_a, self.client.topics)

self.kafka.delete_topic(name_a)
self.client.update_cluster()
self.assertEqual(len(self.client.topics), startlen + 1)
self.assertNotIn(name_a, self.client.topics)
self.assertIn(name_b, self.client.topics)

self.kafka.delete_topic(name_b)
self.client.update_cluster()
self.assertEqual(len(self.client.topics), startlen)
self.assertNotIn(name_b, self.client.topics)


if __name__ == "__main__":
unittest.main()