Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6688. The Trogdor coordinator should track task statuses #4737

Merged
merged 1 commit into from
Apr 8, 2018

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Mar 20, 2018

No description provided.

@cmccabe
Copy link
Contributor Author

cmccabe commented Mar 20, 2018

Test failure is kafka.api.ConsumerBounceTest.testCloseDuringRebalance, not related

@@ -192,6 +191,9 @@ public void run() {
// agents going down?
return;
}
if (log.isTraceEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this if, since we are not doing any calculations for arguments we pass to log.trace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agentStatus#toString is pretty heavy. If you have a dozen workers going on, it will serialize information about all of them into a string.

}
// Notify the TaskManager if the worker state has changed.
if (!worker.state.equals(state)) {
log.info("{}: WATEREMLON: worker state changed to {}", node.name(), state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's " WATEREMLON"?

Copy link
Contributor Author

@cmccabe cmccabe Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot to take that out. Let me fix up these log messages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Looks like the log entry hasn't been updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. I wasn't able to search for it because it was misspelled 😞

StatusData update() {
Histogram.Summary summary = histogram.summarize(percentiles);
StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
summary.percentiles().get(0).value(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to make it a bit more robust w.r.t. adding/removing percentiles form Histogram, i.e. But I see we already do this in other places. I can add JIRA to improve that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, Histogram is internally synchronized, so there should be no conflict. Maybe I misunderstood the question

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I meant that we explicitly get first three values from percentiles

Copy link
Contributor

@apovzner apovzner Apr 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's fine, but I think we should put a comment in StatusUpdater constructor that changing percentiles also need to make sure that it's consistent with json properties in StatusData and how we construct it. The reason is that I just noticed that we actually create these percentiles: this.percentiles = new float[] {0.50f, 0.95f, 0.99f}; but StatusData has @JsonProperty("p90LatencyMs") int p90latencyMs instead of p95.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, @apovzner . I fixed the discrepancy. I also moved the array into the StatusUpdater class and added a comment.

@cmccabe
Copy link
Contributor Author

cmccabe commented Mar 22, 2018

retest this please

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Thanks for the PR. Looks good. Left a few minor comments. There is also an outstanding question from @apovzner about the histogram. Once they are addressed, I can merge this.

}
// Notify the TaskManager if the worker state has changed.
if (!worker.state.equals(state)) {
log.info("{}: WATEREMLON: worker state changed to {}", node.name(), state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Looks like the log entry hasn't been updated.

worker.state = state;
taskManager.updateWorkerState(node.name(), worker.id, state);
} else {
log.info("{}: WATEREMLON: worker state was {}, is now {}", node.name(), worker.state, state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before, update log line?

task.error.isEmpty() ? "(none)" : task.error);
} else if (task.state == ManagedTaskState.RUNNING) {
TreeSet<String> activeWorkers = task.activeWorkers();
log.info("Node {} stopped. Stopping task {} on worker(s): {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing nodeName in the log line?

for (String workerName : activeWorkers) {
nodeManagers.get(workerName).stopWorker(task.id);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like before moving this code into a separate method, we handled PENDING ManagedTaskState. Or this state is not possible when we are in this method? Maybe we should check and throw an exception in that case? Also, what if the task is in STOPPING state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like before moving this code into a separate method, we handled PENDING ManagedTaskState. Or this state is not possible when we are in this method?

Yes, PENDING is impossible here because at that point the worker hasn't been started.

Also, what if the task is in STOPPING state?

There's no additional action needed in that case.

The only transition here is that if one worker fails while a task is RUNNING, the task will transition into STOPPING and the other workers will be stopped. There is nothing to do if the task isn't RUNNING.

Actually, come to think of it, we should probably not start stopping the other tasks unless the first worker stopped with an error.

@cmccabe cmccabe force-pushed the KAFKA-6688 branch 2 times, most recently from 6c8c9cc to b30161a Compare April 6, 2018 05:21
log.info("Node {} stopped. Stopping task {} on worker(s): {}",
task.id, Utils.join(activeWorkers, ", "));
log.info("Node {} stopped with error {}. Stopping task {} on worker(s): {}",
nodeName, task.id, Utils.join(activeWorkers, ", "));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want task.error in the log entry?

@apovzner
Copy link
Contributor

apovzner commented Apr 6, 2018

Looks like my changes conflicted with yours, but there is also a unit test failure (before the conflict happened):

17:02:00 org.apache.kafka.trogdor.agent.AgentTest > testWorkerCompletions FAILED
17:02:00     java.lang.AssertionError: Condition not met within timeout 15000. Timed out waiting for expected workers {"bar":{"id":"bar","workerState":{"state":"RUNNING","spec":{"class":"org.apache.kafka.trogdor.task.SampleTaskSpec","startMs":0,"durationMs":900000,"nodeToExitMs":{"node01":2},"error":"baz"},"startedMs":0,"status":"started"}},"foo":{"id":"foo","workerState":{"state":"DONE","spec":{"class":"org.apache.kafka.trogdor.task.SampleTaskSpec","startMs":0,"durationMs":900000,"nodeToExitMs":{"node01":1}},"startedMs":0,"doneMs":1,"status":"halted"}}}
17:02:00         at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
17:02:00         at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
17:02:00         at org.apache.kafka.trogdor.common.ExpectedTasks.waitFor(ExpectedTasks.java:176)
17:02:00         at org.apache.kafka.trogdor.agent.AgentTest.testWorkerCompletions(AgentTest.java:239)
17:02:00 

@cmccabe
Copy link
Contributor Author

cmccabe commented Apr 6, 2018

Rebased and fixed failing test

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Thanks for the updates, LGTM. Will merge after builds complete.

@apovzner
Copy link
Contributor

apovzner commented Apr 7, 2018

LGTM

@rajinisivaram
Copy link
Contributor

@cmccabe Thanks for the PR. Build failure is unrelated, merging to trunk.

@rajinisivaram rajinisivaram merged commit 40183e3 into apache:trunk Apr 8, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…e#4737)

Reviewers: Anna Povzner <anna@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
@cmccabe cmccabe deleted the KAFKA-6688 branch May 20, 2019 19:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants