Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics #9780

Merged
merged 4 commits into from
Feb 9, 2021

Conversation

rhauch
Copy link
Contributor

@rhauch rhauch commented Dec 22, 2020

The existing Kafka*BackingStore classes used by Connect all use KafkaBasedLog, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. KafkaBasedLog uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the KafkaBasedLog used in the Kafka*BackingStore classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s fetch.max.wait.ms setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to KafkaBasedLog.readToEnd() may block for some period of time even though it’s already caught up to the end.

Instead, we want the KafkaBasedLog.readToEnd() to always return quickly when the log is already caught up. The best way to do this is to have the KafkaBackingStore use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same ListOffset broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the Kafka*BackingStore classes, which means we have three instances of KafkaBasedLog. We don't want three instances of the admin client, and should have all three instances of the KafkaBasedLog share a single admin client instance. In fact, each Kafka*BackingStore instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change Kafka*BackingStores to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that KafkaBasedLog has been used by projects outside of Apache Kafka. While KafkaBasedLog is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:

  1. Add a KafkaBasedLog constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because KafkaBasedLog is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the KafkaBasedLog. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
  2. Add to the Kafka*BackingStore classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its KafkaBasedLog instance, it would pass the admin supplier and pass an init function that takes an admin instance.
  3. Create a new SharedTopicAdmin that lazily creates the TopicAdmin (and underlying Admin client) when required, and closes the admin objects when the SharedTopicAdmin is closed.
  4. Modify the existing TopicAdmin (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in KafkaBasedLog mentioned in Switch to using scala 2.9.2 #1 above. Doing this also makes it easier to test that logic.
  5. Change ConnectDistributed to create a SharedTopicAdmin instance (that is AutoCloseable) before creating the Kafka*BackingStore instances, passing the SharedTopicAdmin (which is an admin supplier) to all three Kafka*BackingStore objects, and finally always closing the SharedTopicAdmin upon termination. (Shutdown of the worker occurs outside of the ConnectDistributed code, so modify DistributedHerder to take in its constructor additional AutoCloseable objects that should be closed when the herder is closed, and then modify ConnectDistributed to pass the SharedTopicAdmin as one of those AutoCloseable instances.)
  6. Change MirrorMaker similarly to ConnectDistributed.
  7. Change existing unit tests to no longer use deprecated constructors.
  8. Add unit tests for new functionality.

This change should be backported to fix the bug in recent releases.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@rhauch rhauch added the connect label Dec 22, 2020
@rhauch rhauch force-pushed the kafka-10021 branch 2 times, most recently from d0fa4da to 1f09822 Compare December 23, 2020 00:10
@rhauch
Copy link
Contributor Author

rhauch commented Dec 23, 2020

Rebased on trunk, and then had to correct tests based on recent changes in Admin API related classes.

@rhauch
Copy link
Contributor Author

rhauch commented Feb 4, 2021

Rebased on trunk to correct conflicts.

Copy link
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rhauch
This looks pretty close already.
Left a few comments and questions.

public DistributedHerder(DistributedConfig config,
Time time,
Worker worker,
String kafkaClusterId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
AutoCloseable... uponShutdown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to avoid a variadic argument here.
Parameters tend to get added with new features in such constructors. And if a new parameter is required that is also a list, then we'll have a mix of list args with a variadic in the end.

Since we transform to list I'd suggest using this type here and pass the single argument with Collections.singletonList in the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I used a variadic array here was to avoid having to create a new connector when no AutoCloseable instances are supplied. If we use a List, then we can change the usage in Connect runtime and in MirrorMaker 2, but anywhere else will break without keeping the old signature. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always keep a constructor with the old signature along with the new if we wanted not to break classes that use DistributedHerder. I'm fine with the change here as a short term workaround. I guess it saves us one constructor but we can use it only once.

@@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
String restUrl,
ConnectMetrics metrics,
Time time,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
AutoCloseable... uponShutdown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above


// Default to the old protocol unless specified otherwise
connectProtocolVersion = CONNECT_PROTOCOL_V0;

herder = PowerMock.createPartialMock(DistributedHerder.class,
new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
new AutoCloseable[]{uponShutdown});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above. We'll avoid the array static init too. Tests here but still that's where array lists come handy


@Mock private TopicAdmin mockTopicAdmin;
private SharedTopicAdmin sharedAdmin;
private int created = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific action on the mock we wish or can verify here instead of implicitly using a aux variable for that?
Replay, expectation and verify should help us verify the action or its absence. I'd have to check closer what such action could be, if there's any. Maybe you can see that more easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really I'm just using that to be able to test that the new topicAdmin() method is returning the correct instance, even after repeated calls. It was an easy way to verify that the TopicAdmin matches what the factory function returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. It's just that we already use a mocking framework and we could use something like:

EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();

if we also defined factory to be a mock as well. That could allow us to evaluate expectations on the mock more accurately (e.g. with a capture if we had to). But sure, if we need something quick and easy we can go with that. It's just that I noticed a mixed use of mocks with this variable that simulates what the mocking framework offers already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember struggling with getting EasyMock to work properly when I originally created this PR, but I tried it again just to be sure.

Unfortunately, the SharedTopicAdmin constructor takes a method reference, and apparently EasyMock provides no way to effectively mock the behavior of a method passed via a method reference. See this still-open EasyMock issue for details.

So, while I agree using mocks to verify the # of times the factor is invoked, because EasyMock can't really mock method references means we're stuck with the current code using the counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, Mockito FTW! I've rewritten the SharedTopicAdminTest class to use Mockito instead of PowerMock and EasyMock, and was able to use mocks to assert the correct number of times an admin instance was created and closed.

* @throws ConnectException if this object has already been closed
*/
public TopicAdmin topicAdmin() {
return admin.updateAndGet(this::createAdmin);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediately obvious to me what's the advantage compared to a synchronized topicAdmin or even get.

I see that the value can go back to null. But we already have the guard with the closed to atomically decide whether this is closed or not. Lmk if I'm missing something, but I feel the points of indirection might be a few more than they have to be here (we have no gains in locking, since we'll need to atomically updateAndGet in every get and the advantage vs synchronized should be negligible here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there is much advantage either way, considering these methods are not called frequently and synchronized would indeed work. I personally like the simplicity of using AtomicReference, which to me seemed natural and straightforward, avoided having to synchronize the entire methods, and needed no if-checks in this method.

Copy link
Contributor

@kkonstantine kkonstantine Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to leave it as an example of the pattern that demonstrates how to apply updateAndGet.

I just didn't feel that the two or three levels of indirection were worth to write the singleton pattern differently.

Copy link
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rhauch
I replied to the open questions. But they are not blockers so LGTM regardless.


@Mock private TopicAdmin mockTopicAdmin;
private SharedTopicAdmin sharedAdmin;
private int created = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. It's just that we already use a mocking framework and we could use something like:

EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();

if we also defined factory to be a mock as well. That could allow us to evaluate expectations on the mock more accurately (e.g. with a capture if we had to). But sure, if we need something quick and easy we can go with that. It's just that I noticed a mixed use of mocks with this variable that simulates what the mocking framework offers already.

…to get end offsets and create topics

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in apache#1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.
@rhauch
Copy link
Contributor Author

rhauch commented Feb 8, 2021

Added a commit to incorporate @kkonstantine's suggestions (per thumbs-up above), and rebased to correct a merge conflict from trunk.

@rhauch
Copy link
Contributor Author

rhauch commented Feb 8, 2021

Added another commit that uses Mockito in the new SharedTopicAdminTest class to do what @kkonstantine had suggested and which was not possible with EasyMock.

Mockito FTW!

Copy link
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great attention to the finest details. Thanks @rhauch !

Final changes LGTM

I assume the build will pass the relevant tests.

@rhauch rhauch merged commit 982ea2f into apache:trunk Feb 9, 2021
rhauch added a commit that referenced this pull request Feb 9, 2021
…to get end offsets and create topics (#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
rhauch added a commit that referenced this pull request Feb 9, 2021
…to get end offsets and create topics (#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
rhauch added a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
rhauch added a commit that referenced this pull request Feb 10, 2021
… client to get end offsets and create topics (#9780)"

This reverts commit 36ebed0.
rhauch added a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
rhauch added a commit that referenced this pull request Feb 14, 2021
…to get end offsets and create topics (#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
chia7712 pushed a commit that referenced this pull request May 17, 2024
…re classes (#15865)

- These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics #9780.
- While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect.
- The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API #13434.
- This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request May 24, 2024
…re classes (apache#15865)

- These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics apache#9780.
- While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect.
- The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API apache#13434.
- This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
…re classes (apache#15865)

- These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics apache#9780.
- While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect.
- The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API apache#13434.
- This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…re classes (apache#15865)

- These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics apache#9780.
- While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect.
- The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API apache#13434.
- This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants