Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7514: Add threads to ConsumeBenchWorker #5864

Merged
merged 17 commits into from Nov 13, 2018
Merged

KAFKA-7514: Add threads to ConsumeBenchWorker #5864

merged 17 commits into from Nov 13, 2018

Conversation

stanislavkozlovski
Copy link
Contributor

@stanislavkozlovski stanislavkozlovski commented Oct 31, 2018

This PR adds a new ConsumeBenchSpec field - "consumerCount". "consumerCount" will be spawned over "consumerCount" threads in the ConsumeBenchWorker.
Since "consumerCount" will be 1 by default, these changes are backwards compatible

It's now questionable how existing fields such as "targetMessagesPerSec", "maxMessages", "consumerGroup" and "activeTopics" should work.

With "activeTopics", we need to decide whether they should be split over the consumers or not.
I see 4 cases which I believe we should address like this:

  1. Random group, subscribe to topics - N unique groups all subscribed to all topics
  2. Specifed group, subscribe to topics - 1 group subscribed to all topics. Consumers share workload.
  3. Random groups, assign partitions - X groups all subscribed to all partitions
  4. Specified group, assign partitions - Throw an exception. Only one consumer can read from a specific partition within the context of a consumer group. It is then unclear how and whether at all we should split the partitions across the consumers. At this phase, I believe it's best to not support this.

I believe "targetMessagesPerSec", "maxMessages" should account for each consumer individually. This would ease implementation by a ton, too.

I haven't written tests yet since I want to flesh out the design first. Any feedback is appreciated

ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50));
while (messagesConsumed < maxMessages) {

ConsumerRecords<byte[], byte[]> records = consumer.poll();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This raises an InterruptException and I can't figure out why. Is there something obvious that I'm missing?

[2018-11-05 17:47:05,859] WARN ConsumeRecords caught an exception:  (org.apache.kafka.trogdor.workload.ConsumeBenchWorker)
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:493)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
	at org.apache.kafka.trogdor.workload.ConsumeBenchWorker$Consumer.poll(ConsumeBenchWorker.java:503)
	at org.apache.kafka.trogdor.workload.ConsumeBenchWorker$ConsumeMessages.call(ConsumeBenchWorker.java:243)
	at org.apache.kafka.trogdor.workload.ConsumeBenchWorker$ConsumeMessages.call(ConsumeBenchWorker.java:198)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.InterruptedException
	... 14 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely something somewhere else failed, and your thread was sent the interrupt exception as part of tearing down the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the cause of this since I haven't been able to reproduce it since. I had changed the parameters in trogdor-run-consume-bench.sh. Might have been something to do with old consumer group state after having re-ran it a couple of times

throttle.increment();
}
startBatchMs = Time.SYSTEM.milliseconds();
}
} catch (Exception e) {
// TODO: Should we close task on consumer failure?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an open question - I'm not sure what's better - to abort the whole task if one consumer fails or continue running until the last one fails

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please abort the whole task if the consumer fails. That's how the rest of the tests work.

In general, the consumer should never fail-- if it does, there is a serious problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I have logic implemented that does not fail the whole task is a StatusUpdater of one consumer fails - only when all StatusUpdaters fail does the task get aborted. Thoughts on that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status updaters should not fail, right? Any failure should abort the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. I reverted my commit

@cmccabe
Copy link
Contributor

cmccabe commented Nov 5, 2018

Please let's use "thread count" instead of "consumer count." We may eventually want to extend this job to be able to spawn multiple workers (if it isn't already).

I think the questions above about how max messages per sec, etc. work are resolved, right? They apply on a per-thread basis. We should add some JavaDoc specifying this.

@stanislavkozlovski
Copy link
Contributor Author

Thanks for the review @cmccabe. The questions on maxMessages and etc are resolves, yes. I have already added a JavaDoc explaining this in ConsumeBenchSpec.java - please take a look to see if they are adequate

@@ -100,6 +108,7 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("threadCount") Integer threadCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this "threadsPerWorker" instead? We will probably want to support multiple workers in the future.

@@ -138,6 +148,11 @@ public int maxMessages() {
return maxMessages;
}

@JsonProperty
public int consumerCount() {
return consumerCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this consistent with the name of the varialbe in the spec

* It is worth noting that the "targetMessagesPerSec", "maxMessages" and "activeTopics" fields apply for every consumer individually.
* If a consumer group is not specified, every consumer is assigned a different, random group. When specified, all consumers use the same group.
* Specifying partitions, a consumer group and multiple consumers will result in an #{@link ConfigException} and the task will abort.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be good to have a separate paragraph to describe the group configuration.

We probably want to mention that if individual partitions are specified, only a single consumer is supported, and subscribe is not used.

this.status = status;
spec.consumerCount() + 2, // 1 thread for all the ConsumeStatusUpdater and 1 for the StatusUpdater
ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
this.statusUpdaterFuture = executor.scheduleAtFixedRate(this.statusUpdater, 2, 1, TimeUnit.MINUTES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Why change the initial delay from 1 minute to 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, I initially thought this was seconds. My reasoning was that since there are more threads spawned and a rebalance happening, it would make sense to wait a bit more before exposing the status.
Anyway, reverting this back to 1 minute

boolean toUseGroupPartitionAssignment = partitionsByTopic.values().stream().allMatch(List::isEmpty);

if (!toUseGroupPartitionAssignment && !toUseRandomConsumeGroup() && consumerCount > 1)
throw new ConfigException(String.format("Will not split partitions across consumers from the %s group", consumerGroup));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception seems a bit confusing.

Maybe something like "You may not specify an explicit partition assignment when using multiple consumers in the same group. Please either leave the consumer group unset, or specify topics instead of partitions."


String clientId = clientId(0);
consumer = consumer(consumerGroup, clientId);
if (!toUseGroupPartitionAssignment) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're handling both the case where this is true, and where it is false, we might as well test for (toUseGroupPartitionAssignment) rather than (!toUseGroupPartitionAssignment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, that's more readable

Properties props = new Properties();
String clientId = String.format("consumer.%s-%s", id, idx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to duplicate the code in the clientId(int idx) function?

private String generateConsumerGroup() {
return "consume-bench-" + UUID.randomUUID().toString();
private boolean toUseRandomConsumeGroup() {
return spec.consumerGroup().equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to just say spec.consumerGroup().isEmpty()

}
new ConsumeStatusUpdater(latencyHistogram, messageSizeHistogram, consumer), 1, 1, TimeUnit.MINUTES);
int perPeriod;
if (spec.targetMessagesPerSec() == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using <= 0

@@ -202,6 +257,9 @@ public Void call() throws Exception {
latencyHistogram.add(elapsedBatchMs);
messageSizeHistogram.add(messageBytes);
bytesConsumed += messageBytes;
if (messagesConsumed >= maxMessages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? The loop condition is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default max.poll.records is 500. This would result in the consumer always consuming 500 at a minimum (provided the topic had the records) and not respect the cases where maxMessages was less than that

/**
* A thread-safe KafkaConsumer wrapper
*/
private static class ThreadSafeConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed. Only one thread at a time should be accessing the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConsumeStatusUpdater calls KafkaConsumer#assignment() dynamically to get the latest assignments for a consumer.
My reasoning was that there might be edge cases where these assignments change during a benchmark run (e.g one consumer finishes early, starts late, etc).
That might not be needed in practice, though. WDYT?

ConsumerRecords<byte[], byte[]> poll() {
this.consumerLock.lock();
try {
return consumer.poll(Duration.ofMillis(50));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

50 ms is way too short for a poll interval. We don't want to use up so much CPU. If there's nothing to consume it should wait for at least a minute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how it before but good point. If we remove the lock we should go with this

Better use of clientId() method
Rename threadCount to threadsPerWorker
Improve ConsumeBenchSpec javadoc
@@ -21,7 +21,7 @@
class ConsumeBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
target_messages_per_sec, max_messages, active_topics,
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None):
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None, consumer_count=1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "threadsPerWorker"

@@ -120,13 +143,62 @@ def test_consume_group_bench(self):
consumer_conf={},
admin_client_conf={},
common_client_conf={},
consumer_count=2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same (should be threads_per_worker)

@cmccabe
Copy link
Contributor

cmccabe commented Nov 9, 2018

Thanks @stanislavkozlovski . Looks good. Should be ready to commit once the variable name is fixed in the python code.

@stanislavkozlovski
Copy link
Contributor Author

@cmccabe the JDK 8 tests passed and the JDK11 failures look like unrelated flaky tests

@cmccabe cmccabe merged commit 8259fda into apache:trunk Nov 13, 2018
Pengxiaolong pushed a commit to Pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Add threads with separate consumers to ConsumeBenchWorker.  Update the Trogdor test scripts and documentation with the new functionality.

Reviewers: Colin McCabe <cmccabe@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants