Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4916: test streams with brokers failing #2719

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
187f9b6
Temporarily use only 2 relevant tests for branch builder
Mar 5, 2017
87f06a8
Undo previous
Mar 5, 2017
978f925
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 6, 2017
57856b0
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 6, 2017
5086dd7
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 7, 2017
40394f3
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 9, 2017
1ee01c1
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 11, 2017
0d30ba4
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
enothereska Mar 13, 2017
7dae290
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
Mar 14, 2017
7252e62
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
enothereska Mar 17, 2017
4eb3670
Merge branch 'trunk' of github.com:enothereska/kafka into trunk
enothereska Mar 17, 2017
9df1010
Initial test
Mar 21, 2017
829f57a
Added more tests
Mar 22, 2017
f702616
Tighten test options to minimize data loss or duplicates
Mar 22, 2017
7f68745
Added tests with failing more than one broker
Mar 22, 2017
10f40a3
Merge remote-tracking branch 'origin/trunk' into KAFKA-4916-broker-bo…
Mar 23, 2017
fc1cabc
Checkpoint
Mar 25, 2017
5e045e7
Merge branch 'trunk' of github.com:enothereska/kafka into trunk
enothereska Mar 27, 2017
b34d6d3
Checkpoint
Mar 27, 2017
05324ce
Revert simple benchmark
Mar 27, 2017
87b4ade
Cleanup
Mar 27, 2017
6a42955
Cleanup
Mar 27, 2017
cd274dc
Checkstyle
Mar 27, 2017
247c7ea
Fix state lock issue
Mar 27, 2017
6defb84
Catch and translate network exceptions
Mar 27, 2017
213b892
Addressed StreamThread comments
Mar 28, 2017
19d502f
Merge branch 'trunk' of github.com:enothereska/kafka into trunk
enothereska Mar 28, 2017
4ac627a
Removed part that requires KIP
Mar 28, 2017
90d077a
Adjust timeout
Mar 28, 2017
878d3b6
Remove line
Mar 28, 2017
090877e
Fix test
Mar 29, 2017
595566b
Merge remote-tracking branch 'origin/trunk' into KAFKA-4916-broker-bo…
Mar 29, 2017
17029e7
Merge with trunk
Mar 30, 2017
86edfe2
Addressed Damian's comments
Mar 30, 2017
a67575e
Matthias' comment
Mar 30, 2017
0b5743c
Merge remote-tracking branch 'origin/trunk' into KAFKA-4916-broker-bo…
Mar 31, 2017
ad1848f
Addressed comments
Mar 31, 2017
f66bcd3
Merge branch 'trunk' of github.com:enothereska/kafka into trunk
enothereska Apr 3, 2017
186d7d0
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
enothereska Apr 3, 2017
bc68fe8
Merge remote-tracking branch 'apache-kafka/trunk' into trunk
enothereska Apr 3, 2017
bca3a69
Merge with trunk
enothereska Apr 3, 2017
d690183
Remove unnecessary call
Apr 4, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -240,8 +240,8 @@ public class StreamsConfig extends AbstractConfig {
STATE_DIR_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
1,
Importance.MEDIUM,
3,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not change this default value -- it's a hassle of anyone want to run a demo with local single broker setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a more dynamic default here, like with acks=all=-1?

e.g. set replicationFactor=-1 => actualReplicationFactor=min(3, brokers.size())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code actually uses min(#brokers, REPLICATION_FACTOR), and prints a warning, but it still runs with, say, 1 broker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this dynamic change is quite dangerous -- if I specify replication 3 and cannot get it, I want an exception... Thus, I would leave replication factor to 1 for demoing purpose -- if anyone goes to production she can set to whatever value is suitable -- or we make the parameter non-optional.

I think it would be a hassle to have default value 3, and overwrite it with 1 in each example we do...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, my suggestion was to make the dynamism configurable.

REPLICATION_FACTOR_CONFIG= N where N >0 => i know what i want. streams should fail if it can't meet this contract
REPLICATION_FACTOR_CONFIG = -1 => streams is smart and can figure it out for me.

this allows me to be as anal retentive as i want, but allows the defaults to work for demos/tests/etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REPLICATION_FACTOR_CONFIG = -1 => streams is smart and can figure it out for me.

What should this be than? Would we need another parameter "default_replacation_factor" and Streams can choose between 1 and this value? Not sure if this would not be too confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my suggestion above was min(3, brokers.size())

i dont like this cause it seems like magic, but it also addresses most peoples issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now require a KIP. Will do in separate PR.

Importance.HIGH,
REPLICATION_FACTOR_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
Expand Down Expand Up @@ -391,6 +391,8 @@ public class StreamsConfig extends AbstractConfig {
static {
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
tempProducerDefaultOverrides.put(ProducerConfig.ACKS_CONFIG, "all");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a KIP if we change any default values...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we? @guozhangwang ? I thought we needed a KIP to add new config values, not each time we tune them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's write a quick KIP for this (also including the default replication factor to 3 above)? I think they are mostly fixing a bug but would better making them well known in the community as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a KIP unnecessarily slows things down. Why do we need to do a KIP to correct an internal flaw? Users are already expecting internal topics to be robust. I'd argue we're fixing a bug here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy without a KIP :) Makes live easier for us. It's @guozhangwang call. Or any other committer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ijuma

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically do KIPs for config changes that impact users. KIP-106 is one such example. If you can make the case that this is an internal bug fix and has no compatibility impact, then no KIP is needed. The replication factor one would seemingly have a compatibility impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now require a KIP and will be done in separate PR.


PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
}
Expand Down
Expand Up @@ -55,16 +55,29 @@ public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final i
* If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
*/
public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
int actualReplicationFactor = replicationFactor;
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
try {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think validateTopicPartitions should also validate replicationFactor

streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
if (metadata.brokers().size() > 0 && metadata.brokers().size() < replicationFactor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently don't do anything about min.insync.replicas - should we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. @norwood any insights on what you have found useful for that? Thanks.

Copy link
Contributor

@norwood norwood Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like min(2, replicationFactor) should be a good default.

i'd also be concerned about this override for brokers.size() < replicationFactor. i think i'd prefer we fail here, rather than getting in to a misconfigured state. we have run in to issues where a user brings up a cluster and as kafka is doing its thing also brings up their streams app. so during startup we see 1 broker, then sometime down the line broker 2...N. this was causing us to precreate a bunch of our streams topics incorrectly (when we only saw one broker), and then on restart we would try to verify topics against actual configs and fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now be done in a KIP and different PR.

log.warn("The number of available brokers {} is less than the desired replication " +
"factor for streams internal topics {}. If running in production, consider " +
"increasing the number of available brokers.",
metadata.brokers().size(), replicationFactor);
actualReplicationFactor = metadata.brokers().size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to do this? I would strongly prefer to throw an exception to the user!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is consistent with how things like schema registry, proactive support etc handle cases when the number of brokers is less than replication factor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is to do the right thing when there are enough brokers, not to provide magic when there just aren't enough brokers (e.g., in a test environment). Currently we do the wrong thing when there are enough brokers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest throwing an exception here with the motivation similar to @norwood above. We have seen similar issues with offset topic num.partitions which we do this "min(broker.size, required num.broekrs)" trick and it introduces much more confusions than user-friendly benefits. For unit tests we should just always override these configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @mjsax if I understand you right, you want the default 3, with the option for a user to set it to 1, right?
Or you want no change at all to what we currently have (default 1, user can set higher). I don't like the current option since it leads to trouble in production. I'm ok with the first option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep the current default 1 and mark the parameter importance as "high", indicating that one most likely wants to change the default if going to production. Default values must not be "production ready" settings IMHO (cf. state.dir). Default values should give the best "out-of-the-box" experience when getting started with you first "word count" -- ie., local single broker setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I cannot keep the default to 1. This is what led to several bugs. It's not great to expect users to set this parameter which streams should be maintaining correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@enothereska, as you know, we have changed the behaviour for the offsets topic so that we default to the safe production setting and fail otherwise. That is based on experience, as @guozhangwang said, and seems more relevant than some of the other examples given.

The question then is how to make it easy for development. For the offsets topic, we set the value to 1 in the server.properties that is used in the quickstarts, etc. That won't work here. There are a few possible solutions that will be helpful for this and many other configs:

  1. Have a config where users can define whether the environment is prod or dev and change the defaults based on that.
  2. Provide methods so that a user can get a prod or dev config. For example, StreamsConfigs.production() or StreamConfigs.development().
  3. Add an enum to the constructor of StreamConfigs where users can define if the environment is production or development.

I think I like 3 best. In any case, we don't need to block this PR on the long-term solution. Still, it may be worth figuring out the end state and then a plan on how to get there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now require a KIP and will be done in a separate PR.

}
streamsKafkaClient.createTopics(topicsToBeCreated, actualReplicationFactor, windowChangeLogAdditionalRetention, metadata);
return;
} catch (StreamsException ex) {
log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
}
try {
Thread.sleep(1000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to pass in an implementation of Time and use time.sleep(1000L) here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I feel 1 second maybe too long in production? In practice brokers should be up / running much earlier than streams apps. For @norwood cases I still think it's better to fail fast and educate users retry creating their apps after the broker is fully up than trying to wait for, say 5 seconds and hopefully it will succeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reducing the time, but passing in Time is a bit of a pain here and other calls also use Thread.sleep. Would prefer to do a cleanup pass later.

} catch (InterruptedException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

// ignore
}
}
throw new StreamsException("Could not create internal topics.");
}
Expand Down
Expand Up @@ -400,17 +400,21 @@ private RuntimeException unAssignChangeLogPartitions() {

@SuppressWarnings("ThrowableNotThrown")
private void shutdownTasksAndState() {
log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix,
activeTasks.keySet(), standbyTasks.keySet());
log.debug("{} shutdownTasksAndState: shutting down all active tasks {} " +
"and standby tasks {} and suspended tasks {} and suspended standby tasks {}", logPrefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}"

activeTasks.keySet(), standbyTasks.keySet(),
suspendedTasks.keySet(), suspendedStandbyTasks.keySet());

final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Close all processors in topology order
firstException.compareAndSet(null, closeAllTasks());
firstException.compareAndSet(null, closeAllSuspendedTasks());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups. We really missed to close suspended tasks. Really bad :( Great catch Eno!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was fun :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need a bug fix release for this! \cc @guozhangwang

// flush state
firstException.compareAndSet(null, flushAllState());
// Close all task state managers. Don't need to set exception as all
// state would have been flushed above
closeAllStateManagers(firstException.get() == null);
closeAllSuspendedStateManagers(firstException.get() == null);
// only commit under clean exit
if (cleanRun && firstException.get() == null) {
firstException.set(commitOffsets());
Expand Down Expand Up @@ -475,6 +479,29 @@ private RuntimeException performOnAllTasks(final AbstractTaskAction action,
return firstException;
}

private RuntimeException performOnAllSuspendedTasks(final AbstractTaskAction action,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two functions are very similar: could we collapse them into one function performOnTasks and pass in a List<AbstractTask> as an additional parameters?

final String exceptionMessage) {
RuntimeException firstException = null;
final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(suspendedTasks.values());
allTasks.addAll(suspendedStandbyTasks.values());
for (final AbstractTask task : allTasks) {
try {
action.apply(task);
} catch (RuntimeException t) {
log.error("{} Failed while executing {} {} due to {}: ",
StreamThread.this.logPrefix,
task.getClass().getSimpleName(),
task.id(),
exceptionMessage,
t);
if (firstException == null) {
firstException = t;
}
}
}
return firstException;
}

private Throwable closeAllStateManagers(final boolean writeCheckpoint) {
return performOnAllTasks(new AbstractTaskAction() {
@Override
Expand All @@ -485,6 +512,16 @@ public void apply(final AbstractTask task) {
}, "close state manager");
}

private Throwable closeAllSuspendedStateManagers(final boolean writeCheckpoint) {
return performOnAllSuspendedTasks(new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id());
task.closeStateManager(writeCheckpoint);
}
}, "close state manager");
}

private RuntimeException commitOffsets() {
// Exceptions should not prevent this call from going through all shutdown steps
return performOnAllTasks(new AbstractTaskAction() {
Expand Down Expand Up @@ -575,6 +612,10 @@ private void runLoop() {

for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition);
if (task == null) {
log.warn("No active tasks for partition " + partition);
continue;
}
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
Expand Down Expand Up @@ -1020,6 +1061,17 @@ public void apply(final AbstractTask task) {
}, "close");
}

private RuntimeException closeAllSuspendedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto above.

return performOnAllSuspendedTasks(new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
task.close();
streamsMetrics.tasksClosedSensor.record();
}
}, "close");
}

private RuntimeException closeAllTasksTopologies() {
return performOnAllTasks(new AbstractTaskAction() {
@Override
Expand Down
Expand Up @@ -203,7 +203,11 @@ private String ensureOneNodeIsReady(final List<Node> nodes) {
break;
}
}
kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
try {
kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I didn't get this, what should be final?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> } catch (final Exception e) {

throw new StreamsException("Could not poll.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new StreamsException("Could not poll.", e)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to wrap even a RTE as a StreamsException here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should hide all underlying network exceptions and wrap them with stream exception IMO. This is consistent with other examples in this class. Otherwise the upper layers would need to know all the details of the underlying classes.

}
}
if (brokerId == null) {
throw new StreamsException("Could not find any available broker.");
Expand Down Expand Up @@ -268,7 +272,12 @@ public MetadataResponse fetchMetadata() {
new MetadataRequest.Builder(null),
Time.SYSTEM.milliseconds(),
true);
final ClientResponse clientResponse = sendRequest(clientRequest);
ClientResponse clientResponse;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

try {
clientResponse = sendRequest(clientRequest);
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly? I don't get this.

throw new StreamsException("Failed to send request");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from a StreamsException i think the only other exception that poll is going to throw is IllegalStateException - should we just handle this in sendRequest and leave this as it was.
Even if there are more exceptions, i think it would be better to handle it in sendRequest and throw a StreamsException from there

}
if (!clientResponse.hasResponse()) {
throw new StreamsException("Empty response for client request.");
}
Expand Down
Expand Up @@ -39,6 +39,7 @@ public class SmokeTestClient extends SmokeTestUtil {
private final File stateDir;
private KafkaStreams streams;
private Thread thread;
private boolean uncaughtException = false;

public SmokeTestClient(File stateDir, String kafka) {
super();
Expand All @@ -51,10 +52,19 @@ public void start() {
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
uncaughtException = true;
e.printStackTrace();
}
});

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
close();
}
}));

thread = new Thread() {
public void run() {
streams.start();
Expand All @@ -64,10 +74,16 @@ public void run() {
}

public void close() {
streams.close();
streams.close(5, TimeUnit.SECONDS);
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println("SMOKE-TEST-CLIENT-CLOSED");
}
try {
thread.join();
} catch (Exception ex) {
// do not remove these printouts since they are needed for health scripts
System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
// ignore
}
}
Expand All @@ -77,19 +93,17 @@ private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
// TODO: set number of threads back to 3 once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess you can remove this now?

// https://issues.apache.org/jira/browse/KAFKA-3758 is solved
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");

source.to(stringSerde, intSerde, "echo");

KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
Expand Down
Expand Up @@ -136,6 +136,12 @@ public static Map<String, Set<Integer>> generate(String kafka, final int numKeys
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// the next 4 config values make sure that all records are produced with no loss and
// no duplicates
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma do these parameters make sense for a config that should not lose data? We are ok to have duplicates. Do I need to do anything with unclean.leader.election?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this now as you've set it as the default in StreamsConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite, since in StreamsConfig it is the internal streams producer. Here it's another producer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename props to producerProps ;)

props.put(ProducerConfig.ACKS_CONFIG, "all");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this one

props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60 * 1000);

KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);

Expand Down