Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #147 from Parsely/feature/topic_autocreate
Browse files Browse the repository at this point in the history
Add topic auto-creation. Resolves #128.
  • Loading branch information
kbourgoin committed Apr 24, 2015
2 parents 22cb75a + c663d71 commit da1a77a
Showing 1 changed file with 39 additions and 2 deletions.
41 changes: 39 additions & 2 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,53 @@
import logging
import random
import time
import weakref

from .broker import Broker
from .exceptions import ConsumerCoordinatorNotAvailable
from .exceptions import (ConsumerCoordinatorNotAvailable,
UnknownTopicOrPartition)
from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse
from .topic import Topic


logger = logging.getLogger(__name__)


class TopicDict(dict):
"""Dictionary which will attempt to auto-create unknown topics."""

def __init__(self, cluster, *args, **kwargs):
super(TopicDict, self).__init__(*args, **kwargs)
self._cluster = weakref.proxy(cluster)

def __missing__(self, key):
logger.info('Topic %s not found. Attempting to auto-create.', key)
if self._create_topic(key):
return self[key]
else:
raise UnknownTopicOrPartition('Unknown topic: {}'.format(key))

def _create_topic(self, topic_name):
"""Auto-create a topic.
Not exposed in the cluster or broker because this is *only*
auto-creation. When there's a real API for creating topics,
with settings and everything, we'll implement that. To expose just
this now would be disingenuous, since it's features would be hobbled.
"""
# Auto-creating will take a moment, so we try 5 times.
for i in xrange(5):
# Auto-creating is as simple as issuing a metadata request
# solely for that topic. The update is just to be sure
# our `Cluster` knows about it.
self._cluster.brokers[0].request_metadata(topics=[topic_name])
self._cluster.update()
if topic_name in self:
logger.info('Topic %s successfully auto-created.', topic_name)
return True
time.sleep(0.1)


class Cluster(object):
def __init__(self,
hosts,
Expand Down Expand Up @@ -67,7 +104,7 @@ def __init__(self,
self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms
self._handler = handler
self._brokers = {}
self._topics = {}
self._topics = TopicDict(self)
self._socket_receive_buffer_bytes = socket_receive_buffer_bytes
self._exclude_internal_topics = exclude_internal_topics
self.update()
Expand Down

0 comments on commit da1a77a

Please sign in to comment.