Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4923: Add Exactly-Once Semantics to Streams #2945

Closed
wants to merge 11 commits into from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Apr 30, 2017

No description provided.

@mjsax
Copy link
Member Author

mjsax commented Apr 30, 2017

Call for review @dguy @enothereska @guozhangwang
(it's again one cleanup and one actual commit)

@mjsax
Copy link
Member Author

mjsax commented Apr 30, 2017

We need to add more test of course...

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3310/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3301/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3305/
Test FAILed (JDK 7 and Scala 2.10).

Files.delete(file.toPath());
try {
Files.delete(file.toPath());
} catch (final NoSuchFileException e) { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change causes a findBugs warning causing the build to fail. It seems suspect to swallow the exception in this way as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct to swallow this exception... Alternatively, we could check if the File exists (right now we just call "delete" blindly) before trying to delete it -- to prevent the exception in the first place. I see now reason to log this or do anything else. What else could I do to fix findBugs? Would a comment do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file not existing is an expected scenario, you should use Files.deleteIfNotExists. Swallowing the exception does capture the intent and is verbose while checking if the file exists as a separate step is unsafe if there is concurrency.

assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
}

@Test
public void shouldAcceptAtLestOnce() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typo

&& eosEnabled
// use EOS default value only if no user config is set
&& !originals().containsKey(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)) {
return EOS_DEFAULT_COMMIT_INTERVAL_MS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is a potential source of confusion because if you iterate over the entries of the config, the override won't be there. I think you either want to do the override in the place where this value is used (like it's done in the producer) or it could be done by mutating the config itself. It may also be a good idea to log the override (like the producer does).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change AbstractConfig#values from private to protected and mutate it in the constructor. However values would not be unmutable anymore (that is why I did not do this originally). Doing the overwrite where the value is use seems to be error prone -- it should be done in a single place and not each time the value is received from the config... A user of StreamsConfig should not need to think about this.

I also think, logging is not required, as it's just two different default values -- we never overwrite a user specified value (and it's documented in COMMIT_INTERVAL_MS_DOC that we do have two different default values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer to update the config, but I would be nice to do that by hooking into the mechanism that chooses a default instead of mutating the underlying map afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if the config has the right value, it will be part of what gets logged by default and it's then less important to log the fact that we are picking a different default. Still, these things make it harder for support people so it's not a clear call. Maybe a debug log would still be useful.

You refer to the fact that this is documented, but every support person knows that a significant number of people don't read the docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer to update the config, but I would be nice to do that by hooking into the mechanism that chooses a default instead of mutating the underlying map afterwards.

This sounds like a major rework on ConfigDef allowing for "conditional defaults" -- do we really want to have this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma @enothereska Any further thought on this? If not, I will just stick with changing AbstractConfig#values from private to protected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A really simple change would be to add a protected method to AbstractConfig: processParsedConfigs. This would give a hook for updating the parsed configs but with a clear scope (i.e. during construction only).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3310/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3315/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3306/
Test PASSed (JDK 8 and Scala 2.12).

@mjsax
Copy link
Member Author

mjsax commented May 1, 2017

values.put(COMMIT_INTERVAL_MS_CONFIG, EOS_DEFAULT_COMMIT_INTERVAL_MS);
}

if (eosEnabled && getInt(NUM_STANDBY_REPLICAS_CONFIG) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this? So with EOS we cannot have standby replicas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it was too late in the night when I added this... (the thought was that we cannot keep the replicas in sync with regard to transactional boundaries -- but of course we can... we read committed data only and reply to "end-of-log" before resuming).

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a big deal. Should have been in streams KIP. Will impact performance a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there some numbers on this? It would be useful to know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is part of KIP-98 -- we just follow what the producer client dictates if we want to use idempotency (and transactions).

It think there are numbers (\cc @apurvam can you point to some benchmark results?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numbers are here: https://docs.google.com/spreadsheets/d/1dHY6M7qCiX-NFvsgvaE0YoVdNq26uA8608XIh_DUpI4/edit#gid=1276994626

As you can see ,setting inflights to 1 from the default of 5 does have a perf impact of 20% on those tests. But also note that compression was enabled and batch sizes were small.

I think that if you tune your batches to be larger, the impact will be ameliorated. For instance, if you commit transactions every 500ms, your linger ms could be 500ms as well for maximum batches. With larger batches, you don't get much benefit from multiple inflights.

Finally, we can't guarantee in order delivery without this setting even before KIP-98. Are all streams operators currently symmetric? Otherwise they would already be buggy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be set only when we are in eos mode right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax could we run the streams benchmarks to see impact? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I am aware of the benchmarks that we did for KIP-98. I was asking if Eno had quantified the performance impact of this config in the context of Streams benchmarks (Eno did some work on that recently).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't quantified that. Now is a good time to do that :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to override MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION here, since it is already taken care of inside KafkaProducer#configureInflightRequests that, if idemponency is set, max.in.flight.request must be 1 or otherwise exception thrown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if producer does check this config, it would be better if Streams would do this check too, and if we go with "overwrite and log" approach, sets the value to 1.

Btw: if we go with "throw exception" approach, I would keep this check in Streams, too. We should move all checks into the constructor to fail faster. That will simplify debugging for users.

if (time1 < time2) return -1;
if (time1 > time2) return 1;
public int compare(final RecordQueue queue1, final RecordQueue queue2) {
final long time1 = queue1.timestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like final for primitive variables used like this. I think we're going overboard with final. It's useful when passed to methods, but in other cases just clutters up code. cc @ijuma @dguy. What are we doing in the rest of the code about this, e.g., client etc? There is a risk we keep overwriting streams code to add or remove final.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be used everywhere a local, field, param is immutable. It is better for readability and can help avoid bugs. Granted it adds 5 extra characters to a declaration, but not much can do about that.

Copy link
Contributor

@ijuma ijuma May 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so you have seen cases where there were bugs because local variables or method parameters were not final? I don't remember that ever happening, so interested in your experience.

Unlike fields, final in method params and local variables make no difference at runtime, don't affect the memory model and are used in a much smaller scope. In Java 8, they even introduced effectively final local variables so that one can use them in lambdas and inner classes without having to add the final keyword. So, the readability point is arguable (and the Java language designers seem to think that the cost is perhaps not worth the benefit).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can argue with the readability. It is much easier to grok the code if you know that a param or local is final. It tells the reader that it is immutable without having to think any further - how is that not better for readability/understanding of the code?

As for bugs, yes i've seen cases where people have accidentally changed/re-used a variable when they shouldn't have. Params probably not so much, but locals definitely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course one can argue! :) Readability is subjective and the sweet spot between the amount of information presented versus conciseness varies from person to person (and it often changes over time for the same person). Verbosity is additional noise and makes it both harder to understand what the code is doing and makes it easier to miss important details.

To give a concrete example, I (and all the other reviewers) missed a bug introduced in a recent Streams PR because a removed line got lost in the noise of other clean-ups.

Finally, final only says that the variable doesn't change. In Java, collections are usually mutable, so mutability is ever present.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To give a concrete example, I (and all the other reviewers) missed a bug introduced in a recent Streams PR because a removed line got lost in the noise of other clean-ups.

But that's not a problem of adding final -- it's a problem in our process -- We started to separated commits to avoid this in the future.

And for collections, you are right. Where is my C/C++ const keyword? :(
(Btw: you would still use an Unmodifyable collection for this case)

}

/**
* Get the next record and queue
*
* @return StampedRecord
*/
public StampedRecord nextRecord(RecordInfo info) {
StampedRecord nextRecord(final RecordInfo info) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of final.


int oldSize = recordQueue.size();
int newSize = recordQueue.addRawRecords(rawRecords);
final int oldSize = recordQueue.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not good use of final

* - {@link #commit(boolean) commit(noNewTransaction)}
* - close state
* <pre>
* @param clean shut down cleanly if {@code true} -- otherwise, just clean up open resources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not clear, perhaps say a bit more.

public abstract void commit();

/**
* @param startNewTransaction {@link Producer#beginTransaction() start a new transaction} after successful commit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find this API change in the KIP. Is it in the design doc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's no public API change -- it's package internal -- on need to cover this in a KIP.

*/
@Override
public void resume() {
log.debug("{} Resuming", logPrefix);
if (exactlyOnceEnabled) {
producer.beginTransaction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if a transaction is already running?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not possible to check -- and at this point, it should never be the case. Producer would throw an exception if there is an open transaction (and this would indicate a bug in our code -- thus, we should not catch this exception but fail IMHO).


if (exactlyOnceEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
producer.commitTransaction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this fail? What if a transaction fails?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can only fail with ProducerFencedException that is handled by the caller (ie, StreamThread).

partitionGroup.close();
metrics.removeAllSensors();
} catch (final RuntimeException e) {
closeStateManager(false);
throw e;
} finally {
if (exactlyOnceEnabled) {
if (!clean) {
try {
producer.abortTransaction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea is that we cancel transaction rather than waiting for it to complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of unclean shutdown, we need to roll back and should not commit the transaction.

@asfbot
Copy link

asfbot commented May 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3331/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3326/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3322/
Test PASSed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor

I think in general LGTM, would like to see results from streams benchmarks to see impact of changes with EoS enabled. Also streams system tests with EoS enabled. Thanks.

@@ -136,20 +136,13 @@ public void flush() {
checkForException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax there are lots of places in this file where we check for exceptions and throw an exception, sometimes at unexpected times. There is a JIRA open for this at https://issues.apache.org/jira/browse/KAFKA-5006. With EoS these kind of exceptions probably don't make sense to be thrown this way. Also with EoS, I guess we'll need to rollback the transaction. My actual question:

  • does it make sense to fix that JIRA as part of this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I can fix it by not checking for exceptions like that as suggested in the JIRA, but could you double check that's ok? Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be fixed independently, and it would be cleaner not to have it in this PR, but just to do it with KAFKA-5006. Thanks!

@mjsax
Copy link
Member Author

mjsax commented May 2, 2017

We can't to end-to-end tests yet, as broker code is not ready.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that we have checks like if(eosEnabled) sprinkled around various places in the code base. IMO - this is a bit of a smell. Ideally the check would be done once and we'd construct implementations of interfaces at that one point and then polymorphism FTW! However, i understand that is a not possible without a major refactor. Anyway, just making it known.

@@ -502,6 +517,13 @@ public StreamsConfig(final Map<?, ?> props) {
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
// remove deprecate ZK config
consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG);
if (eosEnabled) {
if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why throw an exception? Can't we just log it and remove it from the config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is current default behavior. I agree we can log and we do have a JIRA and PR for this already (https://issues.apache.org/jira/browse/KAFKA-5096). Did not update this code yet accordingly...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of logging a warning instead of throwing an exception? If the user explicitly specified a config and we override it, it makes sense to throw an exception so that the user tests show the issue (i.e. fail fast). The user can then figure out if the config was a mistake or if they misunderstood some aspect and need an alternative solution.

This is what the other clients (e.g. producer) do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is, that the user cannot set this config, and there is only one valid value. If there would be multiple valid values, it would make sense to throw (as we should not "randomly" pick one) -- but as there is only one, we can just set it an run (and log). This gives a better out-of-the-box experience IMHO -- it's more "resilient" and "self-healing" if you wish (IMHO, less exceptions are always better if the exceptions are not really needed as there is only one fix)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we had consistent behaviour across clients unless there's a good reason to deviate. Here's a very similar example in the producer where we throw an exception (fail-fast):

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L394

Maybe you can sync with @apurvam and @hachikuji on this. Either way, it should not block this PR as it can be tweaked in a follow-up along with other changes that are planned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, Streams is more than a client. Thus, it seems ok to behave differently. \cc @guozhangwang @dguy @enothereska @miguno @sriramsub

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - my opinion in this case is to log it and move on. I don't see it as exceptional, but whatever. I'm happy either way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we throw an exception here: this is the case that users are not allowed to specify these configs (e.g. this and if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))), if they do specify, we should fail fast to notify them clearly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion about this, and agreed to not throw but overwrite and log. We also created a JIRA for this change: https://issues.apache.org/jira/browse/KAFKA-5096 (with an open PR).

We can discuss again of course. Waiting for further feedback.

// generate producer configs from original properties and overridden maps
final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()));

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
if (eosEnabled) {
if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here?

if (time1 < time2) return -1;
if (time1 > time2) return 1;
public int compare(final RecordQueue queue1, final RecordQueue queue2) {
final long time1 = queue1.timestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be used everywhere a local, field, param is immutable. It is better for readability and can help avoid bugs. Granted it adds 5 extra characters to a declaration, but not much can do about that.

log.trace("{} Committing", logPrefix);
metrics.metrics.measureLatencyNs(
time,
new Runnable() {
@Override
public void run() {
flushState();
stateMgr.checkpoint(recordCollectorOffsets());
commitOffsets();
if (!eosEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it appears we have both eosEnabled and exactlyOnceEnabled in the same class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I moved eosEnabled to base class and forgot to remove exactlyOnceEnabled var here.

}

@Test(expected = ConfigException.class)
public void shouldThrowExceptionBootstrapServersIsNotSet() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldThrowExceptionIfBootstrapServersIsNotSet

}

@Test(expected = ConfigException.class)
public void shouldThrowExceptionIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -202,6 +206,8 @@ public void testProcessOrder() throws Exception {
assertEquals(0, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);

cleanRun = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why we care about cleanRun everywhere. Seems it is only used in the cleanup and why does it matter? How is it going to impact the test if we just pass true or false all the time? The way it is done seems very odd and adds noise

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to close the task in @After according to the rest result -- it the test fails, we close(false) -- on a clean test run we close(true). I agree it's noisy, but thought it good to have. How should we call close in @After without the var? true or false ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my question is: does it matter how we close it in @After? If it doesn't change the test result and cleanly shuts down then i don't see why we can't just always pass true. If it does impact it then keep it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check. Will reply here or just update if we can simplify the code.

@@ -438,17 +499,17 @@ public void testMetrics() throws Exception {
assertNotNull(metrics.getSensor(defaultPrefix + ".task-closed"));
assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records"));

assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit mockTime in ms", defaultTags)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess you did a rename of time -> mockTime and it included strings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid Intellij...

@@ -1013,43 +1118,174 @@ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPar
}

@Test
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFenceWhileProcessing() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing

}

@Test
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFenceAtBeginTransactionWhenTaskIsResumed() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo => shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed

@mjsax
Copy link
Member Author

mjsax commented May 3, 2017

@eno we could do a test run, just with max.inflight.transaction=1 -- but we know from producer test that there is a 20% hit. Do you think we can gain any insight for running Streams test with this setting?

@sriramsub
Copy link
Contributor

I agree with Damian about all the isEosEnabled checks. We should file a follow up PR to tackle this and make the code cleaner/maintainable/debuggable

@mjsax
Copy link
Member Author

mjsax commented May 3, 2017

@sriramsub Do you mean "cleanRun" ? This affects only test code atm. We can also fix it right away in this PR.

@sriramsub
Copy link
Contributor

I meant all the different code path with the eosEnabled checks makes it hard to understand code, hard to maintain and will not be easy to debug. It would be good to think about code restructuring after this release to make this better.

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3446/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3440/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3437/
Test PASSed (JDK 8 and Scala 2.12).

@mjsax
Copy link
Member Author

mjsax commented May 4, 2017

@sriramsub I would be happy to do it differently. But the overall architecture makes is hard to refactor IMHO. The whole task life cycle with rebalanced etc is tricky. (cf. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture) -- btw: I think keeping this architectural picture in mind, help a lot to understand the code (btw. it needs some updates as we did some task refactoring already to make EOS code simpler)

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. I've left a couple of replies to previous comments, but nothing that should stop this from going in

@mjsax
Copy link
Member Author

mjsax commented May 5, 2017

Updated this to address the latest comments/discussions

@guozhangwang
Copy link
Contributor

Just to give an example to what I said in the previous comment: inside RecordCollectorImpl#send() we call producer.send() which could throw the following exceptions (assuming EOS is enabled):

  1. TimeoutException: fetching metadata timed out, this is a fatal error since the topic may not exist forever. We should capture it higher in code hierarchy in StreamTask#process(), try-catch currNode.process() and abort transaction / close task. Also we need to rethrow this exception so that users can be notified and close the whole instance (we can re-consider the global exception handling in another JIRA).

  2. SerializationException: this should never happen since we are using byte<>, byte<>, hence it'll be a bug if it really happens. We should fail-fast the whole instance directly, i.e. try-catch on even higher level StreamThread#run() and shutdown the whole instance.

  3. ProducerFencedException: a rebalance has happened and the thread's consumer has fallen out of the group. We should try-catch it in lower level StreamTask#process()'s currNode.process() and abort transaction / close task. However we do not need to rethrow the exception for this case, and hoping the next consumer.poll will rejoin the group with the callbacks to revoke / reassign tasks.

So we can see that it's better to list all these exceptions and consider capturing them on different levels of the hierarchy to handle them differently. In the future when there is a bug, it will be then easier to traverse the code path and locate its root cause.

@mjsax mjsax force-pushed the kafka-4923-add-eos-to-streams branch from 342c3cb to bded830 Compare May 11, 2017 00:54
@asfbot
Copy link

asfbot commented May 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3735/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3731/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3741/
Test PASSed (JDK 8 and Scala 2.11).

@guozhangwang
Copy link
Contributor

@mjsax could you rebase?

@asfbot
Copy link

asfbot commented May 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4008/
Test PASSed (JDK 7 and Scala 2.11).

@mjsax mjsax force-pushed the kafka-4923-add-eos-to-streams branch from bded830 to 2ae1caa Compare May 16, 2017 20:32
@mjsax
Copy link
Member Author

mjsax commented May 16, 2017

@guozhangwang Rebased -- also update StreamsConfig for setting producer/consumer EOS default values.

@asfbot
Copy link

asfbot commented May 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4018/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4004/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4012/
Test FAILed (JDK 8 and Scala 2.12).

@mjsax
Copy link
Member Author

mjsax commented May 16, 2017

Could reproduce the failing test. It's a race condition on MockConsumer that is not synchronized.

@mjsax
Copy link
Member Author

mjsax commented May 16, 2017

@asfbot
Copy link

asfbot commented May 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4027/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4013/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk.

@asfgit asfgit closed this in ebc7f7c May 17, 2017
@mjsax mjsax deleted the kafka-4923-add-eos-to-streams branch May 17, 2017 00:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants