Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using additional params for MP consumer child process #336

Merged
merged 7 commits into from
Mar 29, 2015
Merged

Using additional params for MP consumer child process #336

merged 7 commits into from
Mar 29, 2015

Conversation

vshlapakov
Copy link
Contributor

This PR is a continuation of #248 and #255.
I cleaned the code a little bit and did the following changes:

  • Moved the events params to a separate param for consistency
    (it's simpler to work with an events group, plus it reduces params count for _mp_consume().Though it's weakly related with the PR content itself and I can move it to the other PR if you want).
  • Passing additional params to internal SimpleConsumer consumer.
    Also I moved partitions param to consumer_options too, as it's an optional param too (for SimpleConsumer), and should work under the same logic.
    Any comments are highly appreciated!

@vshlapakov vshlapakov closed this Mar 4, 2015
@vshlapakov vshlapakov reopened this Mar 4, 2015
@vshlapakov
Copy link
Contributor Author

Something strange with the Travis tests: I got 2 builds with 1 different job failed per test:

It seems that it's not related with the PR itself. Have you seen that before? I'll try to rebuild it one more time by reopening the PR, but probably there's a need to go deeper in the tests implementation.

@vshlapakov vshlapakov closed this Mar 4, 2015
@vshlapakov vshlapakov reopened this Mar 4, 2015
@vshlapakov
Copy link
Contributor Author

It's fine now, although I don't like these cases with the tests.

@dpkp dpkp added the consumer label Mar 8, 2015

def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
def _mp_consume(client, group, topic, queue, size, events, consumer_options):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about **consumer_options and passing them as kwargs?

@vshlapakov vshlapakov closed this Mar 11, 2015
@vshlapakov vshlapakov reopened this Mar 11, 2015
@vshlapakov
Copy link
Contributor Author

Again this issue with tests, reopened the PR.
About changes - sounds good to me, fixed, take a look pls.

- Moved the events params to a separate param for consistency
- Passing additional params to internal SimpleConsumer worker
  for multiprocessing high-level consumer. It allows to use non-default
  consumer settings (fetch_size_bytes, buffer_size, max_buffer_size).
@dpkp dpkp added this to the 0.9.4 Release milestone Mar 24, 2015
@@ -105,7 +108,8 @@ class MultiProcessConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1, partitions_per_proc=0):
num_procs=1, partitions_per_proc=0,
simple_consumer_options=None):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be **simple_consumer_options to keep kwargs interface simple and consistent. agree?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, fixed.

@vshlapakov
Copy link
Contributor Author

There's another issue that I've encountered: I've seen a stalled MP consumer after stop() call, it just hangs on workers join(). Today I have some time to investigate it and found that if MPQueue contains messages (at the moment when we're trying to stop consumer and its child processes), that process will not terminate until all buffered items have been flushed to the pipe.

It's easy to reproduce, you need a non-void topic, then create MPConsumer and wait him to get some messages from Kafka, then it hangs on mp.stop(). Note that we just don't consume all messages from the queue, nothing else.

>>> import kafka
>>> c = kafka.KafkaClient('kafka:9092')
>>> mp = kafka.MultiProcessConsumer(c, group='test', topic='test', num_procs=8)
>>> mp.queue.qsize()
795
>>> mp.stop()
...

According to multiprocessing docs:
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

There's also a note that a queue created using a manager does not have this issue, so I've used it to solve this issue. Another working approach is to add some delay to MPConsumer stop() call after setting the events (to give opportunity to exit from inner infinity loop) + cleaning the queue manually, but the current approach seems better for me.

@vshlapakov
Copy link
Contributor Author

I also added an exception for pylint because it works badly with multiprocessing.Manager.

However I've just understood that it's not directly related with the PR issue, do you want me to split it to the other PR? Though I assume that some of Travis fails could be related with the issue.

@vshlapakov
Copy link
Contributor Author

Ok, it works but doesn't help when the queue is full:

>>> consumer = kafka.MultiProcessConsumer(client=client, group=group, num_procs=8, topic='test')
>>> msg = consumer.get_messages(1000)
>>> consumer.queue.qsize()
1024
>>> consumer.stop()
...

It seems that there's something wrong with event flags, I'll try to solve the puzzle.

@vshlapakov
Copy link
Contributor Author

After the problem was localized, it was pretty obvious that the issue is infinite timeout for queue.put(). This way when the queue is full, we're waiting for free slot in the queue, and that's why the process can't be joined. I improved this part a little bit by adding an inner loop to save this behaviour but doing the same in a manageable way, checking for exit event and using custom timeout.

@vshlapakov vshlapakov closed this Mar 25, 2015
@vshlapakov vshlapakov reopened this Mar 25, 2015
@vshlapakov
Copy link
Contributor Author

Different Travis build results for the same code(again) makes me think that there's something wrong with the integration tests in a deeper level, or there's another similar issue with threads/processes. I'll try to find it if I have some free time.

@dpkp
Copy link
Owner

dpkp commented Mar 29, 2015

This looks great -- thanks for taking this on!

The latest test failures look like they were caused by intermittent failures in starting zookeeper and kafka brokers on travis within the timeouts. That should have nothing to do with python client code (although we need to address those tests separately so they stop flapping).

dpkp added a commit that referenced this pull request Mar 29, 2015
Using additional params for MP consumer child process
@dpkp dpkp merged commit f35995a into dpkp:master Mar 29, 2015
@vshlapakov vshlapakov deleted the feature-mp-consumer-params branch April 27, 2015 21:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants