Skip to content
This repository has been archived by the owner on Apr 16, 2019. It is now read-only.

Commit

Permalink
Use class config for Kafka result bolt.
Browse files Browse the repository at this point in the history
  • Loading branch information
rduplain committed Aug 31, 2015
1 parent 876b97f commit 35d678a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/birding/bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import json

from pykafka import KafkaClient
from streamparse.bolt import Bolt

from .config import get_config, import_name
Expand Down Expand Up @@ -99,7 +98,8 @@ def initialize(self, conf, ctx):
2. Prepare Kafka producer for `tweet` topic.
"""
config = get_config()['ResultTopicBolt']
self.client = KafkaClient(hosts=config['hosts'])
kafka_class = import_name(config['kafka_class'])
self.client = kafka_class(**config['kafka_init'])
self.topic = self.client.topics[config['topic']]
self.producer = self.topic.get_producer()

Expand Down
7 changes: 5 additions & 2 deletions src/birding/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
index: tweet
doc_type: tweet
ResultTopicBolt:
hosts: 127.0.0.1:9092 # comma-separated list of hosts
kafka_class: pykafka.KafkaClient
kafka_init:
hosts: 127.0.0.1:9092 # comma-separated list of hosts
topic: tweet
Appendix: {}
Expand Down Expand Up @@ -80,7 +82,8 @@
index = tv.String(),
doc_type = tv.String()),
ResultTopicBolt = tv.SchemaMapping().of(
hosts = tv.String(),
kafka_class = tv.String(),
kafka_init = tv.StrMapping().of(tv.Passthrough()),
topic = tv.String()),
Appendix = tv.Passthrough())

Expand Down
10 changes: 5 additions & 5 deletions src/birding/follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
from contextlib import contextmanager
from time import sleep

from pykafka import KafkaClient
from pykafka.exceptions import KafkaException

from .config import get_config
from .config import get_config, import_name
from .search import SearchManager


def follow_topic_from_config():
"""Read kafka config, then dispatch to `follow_topic`."""
config = get_config()['ResultTopicBolt']
return follow_topic(config['hosts'], config['topic'])
kafka_class = import_name(config['kafka_class'])
return follow_topic(kafka_class, config['topic'], **config['kafka_init'])


def follow_topic(hosts, name, retry_interval=1):
def follow_topic(kafka_class, name, retry_interval=1, **kafka_init):
"""Dump each message from kafka topic to stdio."""
while True:
try:
client = KafkaClient(hosts=hosts)
client = kafka_class(**kafka_init)
topic = client.topics[name]
consumer = topic.get_simple_consumer(reset_offset_on_start=True)
except Exception as e:
Expand Down

0 comments on commit 35d678a

Please sign in to comment.