Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5111: Improve internal Task APIs #2895

Closed
wants to merge 5 commits into from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Apr 22, 2017

Refactors Task with proper interface methods init(), resume(), commit(), suspend(), and close(). All other methods for task handling are internal now. This allows to simplify StreamThread code, avoid code duplication and allows for easier reasoning of control flow.

@mjsax
Copy link
Member Author

mjsax commented Apr 22, 2017

Call for review @enothereska @dguy @guozhangwang

I did talk about this with @guozhangwang and he basically agrees that this refactoring make sense. It also allows to simplify the EoS code (for example, I could remove closeProducer()) and exception handling. Note, that this is 3 commits, while the second commit contains the actual refactoring. The other two are code cleanups only.

Please review extremely critically, as this is on the hot code path and we must get this correct to not make a step backwards. Nevertheless, I think the refactoring is a good idea to keep the code maintainable long term.

@asfbot
Copy link

asfbot commented Apr 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3112/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3117/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3113/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@enothereska enothereska left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this cleanup blocks EoS. A lot of code has moved around, perhaps in a different style.

return Collections.emptyMap();
}

protected void initializeOffsetLimits() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has stayed the same as before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Just moved some methods in the third commit to "cluster" them a little bit in the code -- right now it's all scattered around the whole file and I have always are hard time to keep an overview (maybe it's just me...)

stateMgr.flush();
}

void initializeStateStores() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what has changed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved.

* @throws ProcessorStateException if there is an error while closing the state manager
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
*/
void closeStateManager(final boolean writeCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what has changed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved.

public int numBuffered() {
return partitionGroup.numBuffered();
@Override
public void resume() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does resume mean? It looks like it's initing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does. It's just to get a clean API -- tasks are "resumed" when they are re-assigned (ie, reused after the got suspended). I though it's helpful to have an extra method for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that

  1. init include both initializing the topology (i.e. processorNode.init) as well as the state stores;
  2. and suspend does not close state stores, only flush them.
  3. and resume only re-init the topology but not state stores.

cc @mjsax ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.
suspend already works like this.

log.trace("{} Committing", logPrefix);
metrics.metrics.measureLatencyNs(
time,
new Runnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't immediately see why this cleanup is needed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is only used once -- and this change keep code local -- when reading code before, it was unclear what's actually happening without stepping into the class that is on a completely different place in the file. Now you can read it at one glance.

I guess it's a personal thing and I revert if you don't like it.

* This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
* overrides one of its functions for efficiency
*/
private class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what has changed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved.

stateMgr.flush();
}

void initializeStateStores() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can still be protected function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"package private" is stronger than protected and there is no reason to make it more open than required. But I am open to make it protected, too. Just don't see why we would need it?

* @throws ProcessorStateException if there is an error while closing the state manager
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
*/
void closeStateManager(final boolean writeCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

@@ -281,6 +281,7 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
// write the checkpoint
@Override
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
log.trace("{} Writing checkpoint", logPrefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: including the acked offsets to checkpoint as well.

try {
commit();
closeStateManager(true);
} catch (final RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already capturing IOException internally, what other types of RTE could be thrown here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there are none -- but isn't a general guard a good idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just afraid that capturing any RTE that we have not thought about and re-close the state managers may hide some issues or even subsequently trigger some other issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: just realizing that we are re-throwing the exception anyways after re-closing the state managers. So this should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this pattern is a bit weird. As @guozhangwang initially said, it's as likely to introduce issues than fix issues. closeStateManager should just do the right thing. If we need to change the catch blocks, we should do it there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is your concern about commit throwing an exception? If that is the case, then the try/catch should just be around that method IMO.

// expected
}
assertTrue(processor.closed);
assertTrue(source1.closed);
assertTrue(source2.closed);
}

@Test(expected = IllegalStateException.class)
public void shouldThrowWhenClosingProducerForNonEoS() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deletion intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. task.closeProducer() get's removed by this PR.

@@ -1138,7 +1129,7 @@ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPar
}

@Test
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringCloseTopologyWhenSuspendingState() throws Exception {
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringSuspendWhenSuspendingState() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could this be further simplified to

shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension

@@ -71,6 +71,7 @@

private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax This file has been a bit hard to review right now, with all the actual changes mixed with code style cleanups and function move-arounds. Could you only maintain the actual change for this PR and with the function move-arounds / cleanups in another PR?

@@ -142,39 +142,31 @@ public StreamTask(final TaskId id,
log.info("{} Initializing state stores", logPrefix);
initializeStateStores();
stateMgr.registerGlobalStateStores(topology.globalStateStores());
initTopology();
init();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we move the initializeStateStores as well as global state store registration inside init functions for both streamtask and standby task, since we are closing them inside close() as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

public int numBuffered() {
return partitionGroup.numBuffered();
@Override
public void resume() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that

  1. init include both initializing the topology (i.e. processorNode.init) as well as the state stores;
  2. and suspend does not close state stores, only flush them.
  3. and resume only re-init the topology but not state stores.

cc @mjsax ?

@mjsax
Copy link
Member Author

mjsax commented Apr 24, 2017

I will remove the last commit and extract it into an own PR, too.

@mjsax mjsax force-pushed the kafka-5111-cleanup-task-code branch from 45bbc17 to ed3e7ed Compare April 24, 2017 20:59
@mjsax
Copy link
Member Author

mjsax commented Apr 24, 2017

Updated this. (removed one cleanup commit)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments left. Otherwise lgtm.

@@ -193,7 +190,7 @@ public String toString(final String indent) {
/**
* Flush all state stores owned by this task
*/
public void flushState() {
protected void flushState() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be no modifier?


return partition == null ? taskId.partition : partition.partition();
}

void registerGlobalStateStores(final List<StateStore> stateStores) {
for (StateStore stateStore : stateStores) {
log.info("{} Register global stores", logPrefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: print store names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same though be did not to it, because it will be the same log message for all processors... quite repetitive. Can add it though.

return checkpointedOffsets;
}

public Collection<TopicPartition> changeLogPartitions() {
Collection<TopicPartition> changeLogPartitions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems only used in unit test now. In that case could we just call checkpointedOffsets().keySet()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing I realized while reviewing this PR: updateStandByTaskMaps is called in two different places but that seems duplicated: first in the middle of addStandbyTasks, then at the end of addStandbyTasks by new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start); which then calls this function.

Would like you to double check if I'm right, if yes we can file a simple PR after this to remove one of its caller.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not good code but also not a bug atm. It would be sufficient to call updateStandByTaskMaps only for tasks we do resume -- if we don't have a task to resume, updateStandByTaskMaps will not so anything as task parameter is null. I guess I can just make this update within this PR -- why do you think we need an extra PR?

log.debug("standby-task [{}] Updating standby replicas of its state store for partition [{}]", id(), partition);
return stateMgr.updateStandbyStates(partition, records);
}

@Override
public void init() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if standbyTask does not ever call init() then why do we want it to be an abstract class? I felt that we can either just remove abstract init and just let StreamTask has its own init function that is also called in resume; or if we do want to have a init abstract logic, we can also move

log.info("standby-task [{}] Initializing state stores", id());
initializeStateStores();
checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
processorContext.initialized();

At let constructor call this init.

Personally I like the first option since we are already overriding the constructors anyways which always covers the init part, and just the stream task needs a reusable init part that is to be used in resume. But it's your call.

public void init() {}

@Override
public void resume() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should re-initializeOffsetLimits here as its committed offsets could have been updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could the offsets have changed? When we enter rebalance, we commit and keep the offset in the suspended standby task -- if we get the standby task reassigned its up-to-date -- otherwise, we drop it anyway.

public void suspend() {
log.info("{} Suspending task", logPrefix);
closeTopology();
log.debug("{} Committing offsets", logPrefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this log as it duplicates with line 263 above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log level in 263 is trace -- but we can remove, as we get info log level in 288.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the trace on 263 is good enough, covering line 290. For line 288 it should be kept as is.

try {
commit();
closeStateManager(true);
} catch (final RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: just realizing that we are re-throwing the exception anyways after re-closing the state managers. So this should be fine.

metrics.removeSensor(taskCommitTimeSensor);
}
}

@Override
public void flushState() {
protected void flushState() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now stream thread does not directly call this function anymore, could it be no modifier?

@@ -110,24 +110,21 @@ public final ThreadCache cache() {
return cache;
}

public abstract void init();
public abstract void resume();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init and resume are not used in standby tasks at all, so maybe we can just remove them from the abstract tasks (see my comments below)?


if (task != null) {
log.debug("{} recycling old standby task {}", logPrefix, taskId);
suspendedStandbyTasks.remove(taskId);
task.initTopology();
task.init();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For standby tasks init is no-op, so we can just remove it? PS: logically we should call resume not init here, but the former is also no-op anyways..

@mjsax
Copy link
Member Author

mjsax commented Apr 25, 2017

Updated this.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -110,15 +110,12 @@ public final ThreadCache cache() {
return cache;
}

public abstract void init();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should create an actual interface for this?

final Sensor taskCommitTimeSensor;


public TaskMetrics(final StreamsMetrics metrics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove public

@@ -111,23 +111,18 @@ public final ThreadCache cache() {
}

public abstract void commit();

public abstract void suspend();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should introduce an interface, Task? Doesn't necessarily need to be done in this PR, though

@@ -1196,23 +1102,23 @@ public String toString() {
* @return A string representation of the StreamThread instance.
*/
public String toString(final String indent) {
final StringBuilder sb = new StringBuilder(indent + "StreamsThread appId: " + this.applicationId + "\n");
final StringBuilder sb = new StringBuilder(indent + "StreamsThread appId: " + applicationId + "\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know this was here before, but why use a StringBuilder and then use + ?

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3165/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3160/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3163/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3165/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3169/
Test PASSed (JDK 8 and Scala 2.11).

@@ -947,7 +947,7 @@ private void addStreamTasks(final Collection<TopicPartition> assignment, final l
try {
final StreamTask task = findMatchingSuspendedTask(taskId, partitions);
if (task != null) {
log.debug("{} recycling old task {}", logPrefix, taskId);
log.debug("{} resuming old task {}", logPrefix, taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not introduced in this PR but, other places capitalize the first letter after log prefix.

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3165/
Test PASSed (JDK 8 and Scala 2.12).

@mjsax mjsax force-pushed the kafka-5111-cleanup-task-code branch from f8481fe to aea709e Compare April 25, 2017 19:22
@mjsax
Copy link
Member Author

mjsax commented Apr 25, 2017

Updated this. Will so a follow up PR to address some more logging issues and other minor comment (like StringBuilder).

@mjsax
Copy link
Member Author

mjsax commented Apr 25, 2017

Retest this please.

2 similar comments
@mjsax
Copy link
Member Author

mjsax commented Apr 25, 2017

Retest this please.

@mjsax
Copy link
Member Author

mjsax commented Apr 26, 2017

Retest this please.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged to trunk. A minor comment that can be incorporated in the follow-up PR.

} catch (final IOException e) {
throw new ProcessorStateException("Error while closing the state manager", e);
}
stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add -- going to do a follow up cleanup PR anyway.

@asfbot
Copy link

asfbot commented Apr 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3176/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3179/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3176/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3178/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3186/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3179/
Test PASSed (JDK 7 and Scala 2.10).

@mjsax mjsax deleted the kafka-5111-cleanup-task-code branch April 28, 2017 00:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants