-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spout and blocking connection? #85
Comments
I can't help with Storm problems. I suggest raising this issue with the Apache Storm community. There is probably a forum or mailing list. |
Just in case someone runs into the same problem, solution is @property
def kafka(self):
ktopics = json.loads(self.config.get('KAFKA', 'topics'))
kgroup = self.config.get('KAFKA', 'group')
kservers = json.loads(self.config.get('KAFKA', 'servers'))
# this is very important setting
# if you don't use it or make it very high - then
# apache storm will be killing the worker, due to 'heartbeat'
# problem, as getting message with kafka-python is blocking
# operation, i.e.
# self.stream = cfg.config.kafka
# record = next(self.stream)
# that will block all I/O for the time defined in
# consumer_timeout_ms
# see more info here:
# https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py
ktimeout = self.config.getint('KAFKA', 'consumer_timeout_ms')
consumer = KafkaConsumer(
*ktopics,
group_id=kgroup,
bootstrap_servers=kservers,
consumer_timeout_ms=ktimeout,
value_deserializer=umsgpack.unpackb
)
return consumer so use consumer_timeout_ms to avoid blocking I/O and then simply catch StopIteration exception as shown below # ---------------------------------------------------
# kafka stream spout using kafka-stream library
# ---------------------------------------------------
import time
import config as cfg
from petrel import storm
from petrel.emitter import Spout
# enable loggers
log = cfg.config.loggers.storm
class KafkaSpout(Spout):
def __init__(self):
self.stream = cfg.config.kafka
super().__init__(script=__file__)
@classmethod
def declareOutputFields(cls):
return ['topic', 'record']
def nextTuple(self):
try:
record = next(self.stream)
log.debug("{}:{}:{}: key={} value={}".format(
record.topic,
record.partition,
record.offset,
record.key,
record.value))
storm.emit([record.topic, record.value])
except StopIteration:
pass
def run():
KafkaSpout().run() |
Good tip -- thanks! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
so inside nextTuple I'm doing this record = next(self.stream), everything works perfect when messages are going inside kafka, but as soon as they're paused (i.e. no messages in kafka for 2 mins and then coming again) - storm process crashes with the following
and then my kafka server shows this
so that the process is dead.
Any ideas why?
The text was updated successfully, but these errors were encountered: