Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4843: More efficient round-robin scheduler #2643

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c3f9a17
More efficient round robin
Mar 5, 2017
138a491
Tighter loop
Mar 5, 2017
caba483
Increased records further
Mar 5, 2017
187f9b6
Temporarily use only 2 relevant tests for branch builder
Mar 5, 2017
87f06a8
Undo previous
Mar 5, 2017
aaa14d1
Temporary reduce number of tests for quick branch builder turnaround
Mar 5, 2017
6c616ad
Re-enable full tests
Mar 5, 2017
6414962
Increase timeout to match increased records
Mar 5, 2017
c1c47ef
Speed up tests by running with scale 1 and 3 only
Mar 5, 2017
978f925
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 6, 2017
b78545e
Damian's comments
Mar 6, 2017
57856b0
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 6, 2017
b34122b
Round robin with more per-task latency fairness
Mar 7, 2017
5086dd7
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 7, 2017
09097ef
Refactor and simplify
Mar 8, 2017
8ec5678
Minor refactoring
Mar 8, 2017
467b11a
Guard against process time being larger than commit time
Mar 9, 2017
6c851cd
Addressed comments
Mar 9, 2017
40394f3
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 9, 2017
1ee01c1
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 11, 2017
160766b
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 11, 2017
0d30ba4
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
enothereska Mar 13, 2017
7dae290
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 14, 2017
c658198
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 16, 2017
7252e62
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
enothereska Mar 17, 2017
4eb3670
Merge branch 'trunk' of github.com:enothereska/kafka into trunk
enothereska Mar 17, 2017
c989871
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 21, 2017
88a16e7
Merge branch 'trunk' of github.com:enothereska/kafka into trunk
Mar 21, 2017
a125039
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 21, 2017
69369ef
Merge with trunk
Mar 23, 2017
4a80d24
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 25, 2017
d93c4c3
Guozhang's comments
Mar 26, 2017
732f4db
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 26, 2017
1b0b168
Int->Boolean
Mar 27, 2017
06e3380
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 27, 2017
7b98e22
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 28, 2017
d4c3ae8
Merge remote-tracking branch 'origin/trunk' into minor-schedule-round…
Mar 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -168,10 +168,17 @@ public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[],
return newQueueSize - oldQueueSize;
}

/**
* @return The number of records left in the buffer of this task's partition group
*/
public int numBuffered() {
return partitionGroup.numBuffered();
}

/**
* Process one record
*
* @return number of records left in the buffer of this task's partition group after the processing is done
* @return number of records processed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case could we just return a boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

*/
@SuppressWarnings("unchecked")
public int process() {
Expand Down Expand Up @@ -224,7 +231,7 @@ public int process() {
processorContext.setCurrentNode(null);
}

return partitionGroup.numBuffered();
return 1;
}

private void updateProcessorContext(final ProcessorRecordContext recordContext, final ProcessorNode currNode) {
Expand Down
Expand Up @@ -223,6 +223,7 @@ private synchronized void setStateWhenNotInPendingShutdown(final State newState)
private final TaskCreator taskCreator = new TaskCreator();

final ConsumerRebalanceListener rebalanceListener;
private final static int UNLIMITED_RECORDS = -1;

public synchronized boolean isInitialized() {
return state == State.RUNNING;
Expand Down Expand Up @@ -519,107 +520,168 @@ private long computeLatency() {
return Math.max(this.timerStartedMs - previousTimeMs, 0);
}

private void runLoop() {
int totalNumBuffered = 0;
boolean requiresPoll = true;
boolean polledRecords = false;

consumer.subscribe(sourceTopicPattern, rebalanceListener);

while (stillRunning()) {
this.timerStartedMs = time.milliseconds();

// try to fetch some records if necessary
if (requiresPoll) {
requiresPoll = false;

boolean longPoll = totalNumBuffered == 0;
/**
* Get the next batch of records by polling.
* @return Next batch of records or null if no records available.
*/
private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
ConsumerRecords<byte[], byte[]> records = null;

ConsumerRecords<byte[], byte[]> records = null;
try {
records = consumer.poll(pollTimeMs);
} catch (NoOffsetForPartitionException ex) {
TopicPartition partition = ex.partition();
if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
consumer.seekToBeginning(ex.partitions());
} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
consumer.seekToEnd(ex.partitions());
log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
} else {

try {
records = consumer.poll(longPoll ? this.pollTimeMs : 0);
} catch (NoOffsetForPartitionException ex) {
TopicPartition partition = ex.partition();
if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
consumer.seekToBeginning(ex.partitions());
} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
consumer.seekToEnd(ex.partitions());
log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
} else {
if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
setState(State.PENDING_SHUTDOWN);
String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
}

if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
setState(State.PENDING_SHUTDOWN);
String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
}
if (originalReset.equals("earliest")) {
consumer.seekToBeginning(ex.partitions());
} else if (originalReset.equals("latest")) {
consumer.seekToEnd(ex.partitions());
}
log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
}

if (originalReset.equals("earliest")) {
consumer.seekToBeginning(ex.partitions());
} else if (originalReset.equals("latest")) {
consumer.seekToEnd(ex.partitions());
}
log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
}
}

}
if (rebalanceException != null)
throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);

if (rebalanceException != null)
throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
return records;
}

if (records != null && !records.isEmpty()) {
int numAddedRecords = 0;
/**
* Take records and add them to each respective task
* @param records Records, can be null
*/
private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
if (records != null && !records.isEmpty()) {
int numAddedRecords = 0;

for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
polledRecords = true;
} else {
polledRecords = false;
}
for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
}
}

// only record poll latency is long poll is required
if (longPoll) {
streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
/**
* Schedule the records processing by selecting which record is processed next. Commits may
* happen as records are processed.
* @tasks The tasks that have records.
* @param recordsProcessedBeforeCommit number of records to be processed before commit is called.
* if UNLIMITED_RECORDS, then commit is never called
* @return Number of records processed since last commit.
*/
private long processAndPunctuate(final Map<TaskId, StreamTask> tasks,
final long recordsProcessedBeforeCommit) {

long totalProcessedEachRound;
long totalProcessedSinceLastMaybeCommit = 0;
// Round-robin scheduling by taking one record from each task repeatedly
// until no task has any records left
do {
totalProcessedEachRound = 0;
for (StreamTask task : tasks.values()) {
// we processed one record,
// and more are buffered waiting for the next round
if (task.process() > 0) {
totalProcessedEachRound++;
totalProcessedSinceLastMaybeCommit++;
}
}
if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS &&
totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) {
totalProcessedSinceLastMaybeCommit = 0;
long processLatency = computeLatency();
streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessedSinceLastMaybeCommit,
timerStartedMs);
maybeCommit(this.timerStartedMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One meta question: since we are only checking for commit at the round boundary anyways (by round boundary I mean, suppose we have 5 tasks, we will not check committing when only processed task1,2,3.). And the checking condition of maybeCommit itself is minimal, could we just trigger maybeCommit every time we have done a round of one-record-per-task?

Also I'm wondering if we should update this.timerStartedMs once we have successfully executed the commit since otherwise we will always commit once every each round right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. (the first question): that would add a timestamp calculation. In the case when there is one task, that would mean one call to time.millisecond() for each record, which is expensive.
  2. this.timerStartedMs is actually updated in the computeLatency() call above, so we are ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to add to the first question: we now calculate if we are overshooting the commit interval and adjust the number of records accordingly. So we don't overshoot the commit interval anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK makes sense.

}
} while (totalProcessedEachRound != 0);

// try to process one fetch record from each task via the topology, and also trigger punctuate
// functions if necessary, which may result in more records going through the topology in this loop
if (totalNumBuffered > 0 || polledRecords) {
totalNumBuffered = 0;

if (!activeTasks.isEmpty()) {
for (StreamTask task : activeTasks.values()) {
// go over the tasks again to punctuate or commit
for (StreamTask task : tasks.values()) {
maybePunctuate(task);
if (task.commitNeeded())
commitOne(task);
}

totalNumBuffered += task.process();
return totalProcessedSinceLastMaybeCommit;
}

requiresPoll = requiresPoll || task.requiresPoll();
/**
* Adjust the number of records that should be processed by scheduler. This avoids
* scenarios where the processing time is higher than the commit time.
* @param prevRecordsProcessedBeforeCommit Previous number of records processed by scheduler.
* @param totalProcessed Total number of records processed in this last round.
* @param processLatency Total processing latency in ms processed in this last round.
* @param commitTime Desired commit time in ms.
* @return An adjusted number of records to be processed in the next round.
*/
private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedBeforeCommit, final long totalProcessed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method could be reduced to sth like:

if (processorLatency > 0) {
  return Math.max(1, (commitTime * totalProcessed) / processLatency);
}
return UNLIMITED_RECORDS;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd only want to make this adjustment if processLatency > commitTime though. In most cases that would mean no adjustment is needed at all and all records that are polled are processed. E.g., with the default commit time of 30 seconds currently I don't want to make any adjustments at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - yep. I see if it was at the default, i.e., UNLIMITED and processorLatency < commitTime, then don't adjust. Makes sense

final long processLatency, final long commitTime) {
long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
// check if process latency larger than commit latency
// note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so
// we will never process all records again. This might be an issue if the initial measurement
// was off due to a slow start.
if (processLatency > commitTime) {
// push down
recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}",
logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
} else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) {
// push up
recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
log.debug("{} processing latency {} > commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}",
logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
}

streamsMetrics.processTimeSensor.record(computeLatency(), timerStartedMs);
return recordsProcessedBeforeCommit;
}

maybePunctuate(task);
/**
* Main event loop for polling, and processing records through topologies.
*/
private void runLoop() {
long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
consumer.subscribe(sourceTopicPattern, rebalanceListener);

if (task.commitNeeded())
commitOne(task);
}
while (stillRunning()) {
this.timerStartedMs = time.milliseconds();

} else {
// even when no task is assigned, we must poll to get a task.
requiresPoll = true;
// try to fetch some records if necessary
ConsumerRecords<byte[], byte[]> records = pollRequests(this.pollTimeMs);
if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
addRecordsToTasks(records);
final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit);
if (totalProcessed > 0) {
final long processLatency = computeLatency();
streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to record processTimeSensor again, since it is already recorded inside processAndPunctuate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need it. For example, if processAndPunctuate is told to only process 5 records before committing (i.e., recordsProcessedBeforeCommit = 5), and we have 7 records in total, then processAndPunctuate would process 5, then commit, then process the 2 left and return 2. Then the above code in line 677 would record the latency of the last 2 records and attempt to commit if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying.

timerStartedMs);
recordsProcessedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
processLatency, commitTimeMs);
}

} else {
requiresPoll = true;
}

maybeCommit(timerStartedMs);
maybeUpdateStandbyTasks();

maybeClean(timerStartedMs);
}
log.info("{} Shutting down at user request", logPrefix);
Expand Down Expand Up @@ -692,8 +754,9 @@ private void maybePunctuate(StreamTask task) {
protected void maybeCommit(final long now) {

if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed",
logPrefix, commitTimeMs, activeTasks, standbyTasks);

log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed by {}ms",
logPrefix, activeTasks, standbyTasks, commitTimeMs, now - lastCommitMs);

commitAll();
lastCommitMs = now;
Expand Down