Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4280][kafka-connector] Explicit start position configuration for Kafka Consumer #2509

Closed
wants to merge 6 commits into from

Conversation

tzulitai
Copy link
Contributor

@tzulitai tzulitai commented Sep 17, 2016

This is the first PR for a series of new explicit setter configurations for the Kafka Consumer (see also FLINK-3398 for details on opt-out from offset committing).

It adds the following new explicit setter methods to configure the starting position for the Kafka Consumer connector:

FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09
kafka.setStartFromEarliest(); // start from earliest without respecting any committed offsets
kafka.setStartFromLatest(); // start from latest without respecting any committed offsets
kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / Kafka as starting points

The default is to start from group offsets, so we won't be breaking existing user code.

One thing to note is that this PR also includes some refactoring to consolidate all start offset assigning logic for partitions within the fetcher. For example, in 0.8 version, with this change the SimpleConsumerThread no longer deals with deciding where a partition needs to start from; all partitions should already be assigned starting offsets by the fetcher, and it simply needs to start consuming the partition.This is a pre-preparation for transparent partition discovery for the Kafka consumers in FLINK-4022.

I suggest to review this PR after #2369 to reduce effort in getting the 0.10 Kafka consumer in first. Tests for the new function will be added in follow-up commits.

@gyfora
Copy link
Contributor

gyfora commented Sep 26, 2016

Hi,

I like the proposed changes, do you think it would make sense to add the possibility to set specific offsets on a per partition basis?

kafka.setStartOffsets(Map<Integer, Long> partitionOffsets)

I think this is extremely useful in production use.

@tzulitai
Copy link
Contributor Author

tzulitai commented Sep 26, 2016

Hi @gyfora, thanks for the review.
Yes, it is absolutely possible to add that. There's actually a JIRA for that feature too (FLINK-3123), so I'd say we can add that feature on top of the proposed changes here, as a separate follow up PR after this one?

One note though, the API for that feature would need to be able to specify offsets for partitions of different topics, since the Kafka consumers can subscribe multiple topics. So, Map<Integer,Long> wouldn't fit this case, probably would be better off having a new user-facing class as the argument to define the offsets.

@tzulitai
Copy link
Contributor Author

I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector to be merged.

@rmetzger
Copy link
Contributor

Thank you for working on this. I gave #2369 some love today to speed up things ;)

@gyfora
Copy link
Contributor

gyfora commented Sep 26, 2016

@tzulitai makes sense ! As for for the Map<Int, Long> you are right, the multiple topic case slipped my mind :)

private static long getInvalidOffsetBehavior(Properties config) {
/**
* Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
* the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look out for https://issues.apache.org/jira/browse/KAFKA-3370 if you aren't already aware. Right now allowing none and catching the exception only on startup is the best workaround I've seen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the pointer.
We are discussing this issue here https://issues.apache.org/jira/browse/FLINK-4280 and here https://issues.apache.org/jira/browse/FLINK-3037

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, FLINK-3037 describes what I'm more concerned about, but hadn't seen that ticket yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. To keep things simple for now, I propose to fix https://issues.apache.org/jira/browse/FLINK-3037 as a separate PR.

@tzulitai
Copy link
Contributor Author

tzulitai commented Oct 13, 2016

Rebasing this now ...

@tzulitai
Copy link
Contributor Author

tzulitai commented Oct 13, 2016

@rmetzger @gyfora @koeninger Rebased this on the Kafka 0.10 connector and some other recent changes. This is ready for review now ;) I'd like to add tests for this after #2580, because #2580 adds a KafkaOffsetHandler to the Kafka test environment in the IT tests, which will come in handy when writing tests for this PR.

I'll also open a separate PR based on this one for FLINK-3123 (set specific offsets for startup).

kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
// don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers
}
} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed changes here (in the Kafka09Fetcher, overall) actually fixes that issue?

@tzulitai
Copy link
Contributor Author

tzulitai commented Oct 24, 2016

Rebased on recent Kafka consumer changes, fixed failing Kafka 0.10 exactly-once tests, and added integration tests (testStartFromEarliestOffsets, testStartFromLatestOffsets, and testStartFromGroupOffsets) for the new explicit startup modes.

However, I'm bumping into kafka ConfigExceptions when instantiating KafkaConsumer in the KafkaOffsetHandler for the testStartFromEarliestOffsets test in versions 0.9 and 0.10. Still investigating the issue, so currently testStartFromEarliestOffsets is deliberately commented out in 0.9 and 0.10 IT tests until I figure out how to fix it, for some early reviews.

Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for opening a pull request for this feature. Its something many users requested and many users will like!

Sorry for the big delay with the review.

I have some comments on the proposed changes that need to be addressed before the PR is ready.

@@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
schema,
new Properties(),
0L,
StartupMode.GROUP_OFFSETS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the indentation of the added lines is correct, but the indentation of the file is wrong. Could you fix that with the PR?

Copy link
Contributor Author

@tzulitai tzulitai Nov 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do! I think it's actually the other argument lines are indented with spaces instead of tab.

@@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}

// --- startup mode ---

// TODO not passing due to Kafka Consumer config error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is easy to fix, right? You just have to put serializer classes into the standardProps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if I recall correctly, that's what I did in the first place, but that caused some other issues. I'll definitely give this another look and make sure the test is runnable.

import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Star imports are something we try to avoid in Flink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this was a IDE auto-complete. The style checks don't cover the test codes, right? I'll revert this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we are currently not checking the tests

for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
if (!part.isOffsetDefined()) {
throw new RuntimeException("SimpleConsumerThread received a partition with undefined starting offset");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use this in an argument check, we should throw an IllegalArgumentException here.

// @Test(timeout = 60000)
// public void testStartFromEarliestOffsets() throws Exception {
// runStartFromEarliestOffsets();
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fix that as well.

}
}
}
} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this case can currently never happen because on restore, we are only adding partitions part from the restore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was adding this as a preparation for the kafka partition discovery task.
But it'd probably make sense to remove it for this PR to avoid confusion.

List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
{
// collect which partitions we should fetch offsets for
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why you need to copy the partitions into these lists.
I think you can just go over the list and call getLastOffsetFromKafka with part.getOffset() as the last argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, didn't think of that. I'll call getLastOffsetFromKafka if getOffset() returns OffsetRequest.EarliestTime() or OffsetRequest.LatestTime().

protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
emitRecord(record, partition, offset, Long.MIN_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably break as well when rebasing

case EARLIEST:
LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);

seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a bit inefficient to convert the subscribedPartitions() to an ArrayList, and then in seekPartitionsToBeginning the List is converted back into an array. I think we can save the ArrayList step and create an array immediately.

Copy link
Contributor Author

@tzulitai tzulitai Nov 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this one is that the seekToBeginning method broke compatibility from 0.8 to 0.9+.
In 0.8, it's seekToBeginning(TopicPartition...) while in 0.9+ it's seekToBeginning(Collection<TopicPartition>).

I'll integrate these seek methods into the KafkaConsumerCallBridge introduced in a recent PR. It'll be inevitable that we must redundantly do the Array -> List conversion because our subscribedPartitions is an Array, while 0.9+ methods take a collection. For the 0.8 methods, instead of converting the list back to an array, I'll change this to just iterate over the list and call seekPartitionsToBeginning for each one.

case LATEST:
LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);

seekPartitionsToEnd(consumer, convertKafkaPartitions(subscribedPartitions()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@tzulitai
Copy link
Contributor Author

tzulitai commented Nov 23, 2016

Thanks for the review @rmetzger :)
I'll aim to address your comments and rebase by the end of this week (will tag you for another review once it's ready).

@tzulitai
Copy link
Contributor Author

Hi @rmetzger, I've addressed all comments. I'll leave comments inline of code on parts that addresses your more bigger comments, to help with the second-pass review.

@@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> thisSubtaskPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
Copy link
Contributor Author

@tzulitai tzulitai Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the startup mode logic cleaner, I've changed the AbstractFetcher life cycle a bit.
Now, restored state is provided when constructing the AbstractFetcher, instead of explicitly calling AbstractFetcher#restoreOffsets() as a separate call.

This allows the AbstractFetcher to have a final isRestored flag that version-specific implementations can use once they are created.

The startup offset configuring logic is much simpler now with this flag:

if (isRestored) {
  // all subscribed partition states should have defined offset
  // setup the KafkaConsumer client we're using to respect these restored offsets
} else {
  // all subscribed partition states have no defined offset
  // (1) set offsets depending on whether startup mode is EARLIEST, LATEST, or GROUP_OFFSET
  // (2) use the fetched offsets from Kafka to set the initial partition states we use in Flink.
}

// runStartFromEarliestOffsets();
// }
@Test(timeout = 60000)
public void testStartFromEarliestOffsets() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests past now with no problem. You're right, setting the key/value deserializer keys did the trick.

*
* @param consumer The consumer connected to lead broker
* @param partitions The list of partitions we need offsets for
* @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
*/
private static void getLastOffsetFromKafka(
private static void requestAndSetSpecificTimeOffsetsFromKafka(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the utility Kafka request methods in this class to avoid creating redundant lists.

@@ -482,12 +476,39 @@ public void runStartFromEarliestOffsets() throws Exception {
* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
*/
public void runStartFromLatestOffsets() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this test easier without having to sleep, the test now does this:

  1. First write 50 records to each partition (these shouldn't be read)
  2. Set some offsets in Kafka (should be ignored)
  3. Start a latest-reading consuming job. This jobs throws exception if it reads any of the first 50 records
  4. Wait until the consume job has fully started (added an util method to JobManagerCommunicationUtils for this)
  5. Write 200 extra records to each partition.
  6. Once the writing finishes, cancel the consume job.
  7. Check if the consume job threw any test errors.

@tzulitai
Copy link
Contributor Author

tzulitai commented Dec 8, 2016

Rebased on the "flink-connectors" change.

@rmetzger
Copy link
Contributor

rmetzger commented Dec 8, 2016

Thanks a lot! I have your Kafka pull requests on my todo list. I hope I get to it soon. I'm really sorry.

@tzulitai tzulitai force-pushed the FLINK-4280 branch 2 times, most recently from 9a703f8 to 0a84d0a Compare February 4, 2017 11:40
@tzulitai
Copy link
Contributor Author

tzulitai commented Feb 4, 2017

Note about 3rd commit: fixed failing FlinkKafkaConsumerBaseMigrationTests after the rebase.

The tests were failing due to the removal of AbstractFetcher#restoreOffsets(...) method as part of the refactoring of offset restoration in this PR. On the other hand, the previous implementation of tests in FlinkKafkaConsumerBaseMigrationTest were too tightly coupled with how the connector was implemented, i.e. it was testing how the AbstractFetcher methods are called, whether MAX_VALUE watermark was emitted (which will likely change as features are added to the connector) etc, even though the actual purpose of the tests was simply to test states were restored correctly.

The 3rd commit therefore attempts to simplify FlinkKafkaConsumerBaseMigrationTest to only test legacy state restore behaviour. The deleted parts, IMHO, are already covered in other tests.

Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a huge change :) I've quickly scanned the changes again, and I didn't spot anything critical.
I had some questions here and there, but other than that it looks good.

I'll now test the code a bit locally to see if everything behaves as expected, if the logging is done properly etc.
Once I'm done with that and my questions were answered, we can probably merge this change


All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When
configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`,
the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a note that this setting does NOT affect the start position when restoring from a savepoint or checkpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will add.

@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed now? If we have this in the default props, we can not ensure that users don't need to set it manually

Copy link
Contributor Author

@tzulitai tzulitai Feb 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow, without these settings, the new tests that test setStartFromXXXX methods will fail by complaining the property config does not specify settings for the deserializers.

I guess it is because in those tests, we have Kafka clients that are used only for offset committing and fetching, in which case those clients cannot infer the types to use for the serializers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should move this out of the standardProps and set them in those tests only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, move these settings out of the standard properties

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably its the KafkaOffsetHandlerImpl. In that case, yes, just put the additional properties when creating the instance.

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the "the default behaviour" also mean that we only respect the "auto.offset.reset" configs in that case?

Copy link
Contributor Author

@tzulitai tzulitai Feb 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If the consumer group does not contain offsets for a partition, the "auto.offset.reset" property is used for that partition. I think this is the behaviour of Kafka's high level consumer (and also our connector's original behaviour), so I intended to design it that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, cool. Can you add that to the docs as well?

env.execute(consumeExtraRecordsJobName);
} catch (Throwable t) {
if (!(t.getCause() instanceof JobCancellationException))
error.set(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the unwritten Flink styleguide, we are always using {} after an if().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch .. sloppy styling :/


@Override
public void close() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you've removed a lot of code from this test here. I guess that the DummyFlinkKafkaConsumer covers everything the deleted code did?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's what this comment is for: #2509 (comment)

@kl0u I think you've implemented most of the migration tests. Can you take a look at the changes @tzulitai is proposing?

@rmetzger
Copy link
Contributor

I've tested the change locally and with great success. So once my comments are addressed, the change is good to be merged

@tzulitai
Copy link
Contributor Author

tzulitai commented Feb 15, 2017

Thanks for the review @rmetzger! The final 2 commits have addressed all your comments.

I'll also wait for @kl0u to have a look at the changes in FlinkKafkaConsumerBaseMigrationTest.
After that and a final run on Travis, I'll merge this to master.

Properties offsetHandlerProps = new Properties();
offsetHandlerProps.putAll(standardProps);
offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it make sense to set these properties once in the KafkaOffsetHandlerImpl instead of all locations where the offset handler is being created?

Copy link
Contributor Author

@tzulitai tzulitai Feb 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do that because ideally the constructor of KafkaOffsetHandlerImpl has no knowledge of whether the provided Properties should be manipulated or not (users of KafkaOffsetHandlerImpl provide the properties).

However, I think it would make sense to do what you suggested in the KafkaOffsetHandlerImpl if it always just uses standardProps instead of a provided properties. In our case it would be completely fine to always just use standardProps (plus the deserializer settings), so I'll change it as proposed.

@tzulitai
Copy link
Contributor Author

tzulitai commented Feb 15, 2017

Last commit addresses consolidating the deserializer settings for KafkaOffsetHandlerImpl.

@kl0u
Copy link
Contributor

kl0u commented Feb 15, 2017

Hi @tzulitai and @rmetzger . I did not have time so far to look into it. I hope I will be able to do it till the end of the week. Is this ok?

@rmetzger
Copy link
Contributor

@tzulitai how does this fit your timeline. Are there PRs depending this or is this PR blocking your in any way?
If so, I would propose that we merge it right away.

@tzulitai
Copy link
Contributor Author

Thanks for letting us know @kl0u!

Yes, there are other pending PRs based on this.
I just double checked the changes in FlinkKafkaConsumerBaseMigrationTest myself, and I think that they are reasonable.

@kl0u
Copy link
Contributor

kl0u commented Feb 15, 2017

Thanks @tzulitai and @rmetzger ! Of course, feel free to proceed with this.

@tzulitai
Copy link
Contributor Author

Thanks! If do you happen to find inappropriate changes in FlinkKafkaConsumerBaseMigrationTest, please let me know, will be happy to discuss and fix it :-)

Merging this to master now ..

@kl0u
Copy link
Contributor

kl0u commented Feb 15, 2017

Perfect!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants