Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2372: Add Kafka-backed storage of Copycat configs. #241

Closed

Conversation

ewencp
Copy link
Contributor

@ewencp ewencp commented Sep 25, 2015

This also adds some other needed infrastructure for distributed Copycat, most
importantly the DistributedHerder, and refactors some code for handling
Kafka-backed logs into KafkaBasedLog since this is shared betweeen offset and
config storage.

This also adds some other needed infrastructure for distributed Copycat, most
importantly the DistributedHerder, and refactors some code for handling
Kafka-backed logs into KafkaBasedLog since this is shared betweeen offset and
config storage.
@ewencp
Copy link
Contributor Author

ewencp commented Sep 25, 2015

@gwenshap Since you so kindly merged #202, we can now move on to this patch. Sorry for the large patch, but each of these requires filling in a bit more of the surrounding code to support the distributed version. Also note that the DistributedHerder right now is largely the same as the StandaloneHerder, so won't be that interesting to review here aside from minor changes to how the connector addition/deletion works with KafkaConfigStorage (same goes for the corresponding test). It'll change much more with KAFKA-2371.

@wushujames care to take a look as well since you were looking over the other CC patches?

@asfbot
Copy link

asfbot commented Sep 25, 2015

kafka-trunk-git-pr #548 FAILURE
Looks like there's a problem with this pull request

@wushujames
Copy link
Contributor

"Sure thing", James says naively, before realizing what he's getting into.

Let me check if I understand the overall story (haven't read the patch yet).

In distributed mode, someone submits the "job" into the copycat system, and then the nodes will respond to that and start running the job. So the "someone" will write it to kafka, and then the nodes will pull it from kafka. Is that the design?

@ewencp
Copy link
Contributor Author

ewencp commented Sep 25, 2015

@wushujames Yup, that's right. The job submission isn't hooked up yet, but this patch does add some more of the infrastructure that will lead to it. Check the javadoc on the KafkaConfigStorage class, it gives a bunch more detail that may be helpful before you start reading through the rest of the patch. Things get tricky in the face of failures because we don't have transactional producers yet.

@@ -153,8 +153,9 @@ public void unsubscribe() {
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
if (!subscriptions.isPaused(entry.getKey())) {
List<ConsumerRecord<K, V>> recs = entry.getValue();
if (!recs.isEmpty())
if (!recs.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Kafka coding convention to do braces around single-line if statements? Because I've noticed them being skipped before, but I didn't speak up because I didn't know what your conventions are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we generally skip them. Confluent's code follows different conventions, so I get them mixed up sometimes, thanks for catching that!

@@ -67,7 +68,7 @@ public ConnectorConfig() {
this(new Properties());
}

public ConnectorConfig(Properties props) {
public ConnectorConfig(Map<?, ?> props) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it Map [String, String] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@gwenshap
Copy link
Contributor

I had few questions, but overall this is really awesome.


Set<TopicPartition> assignment = consumer.assignment();

// This approach to getting the current end offset is hacky until we have an API for looking these up directly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wushujames
Copy link
Contributor

Sorry I haven't been able to do a review of this (beyond the typos you've already fixed). I'll see if I can find some time this week. @gwenshap's the main reviewer, though, so let me know if you want to proceed without me. I don't want to hold things up.

@ewencp
Copy link
Contributor Author

ewencp commented Oct 12, 2015

@wushujames No worries. We're going to push this through, but further review after the fact is still useful as well -- whenever you have some time to look it over, it would be appreciated!

@ewencp
Copy link
Contributor Author

ewencp commented Oct 12, 2015

@gwenshap Updated the patch. I also added some code (and relevant checks in the tests) to expose the offsets as I mentioned in a previous comment. I'm not sure about splitting up the ephemeral/non-ephemeral configs. It does seem cleaner conceptually to separate them since they can be kept separate, but on the other hand its more complicated in that it requires multiple topics (we already have 2 per cluster, one for offsets & one for configs), the implementation has to track & consume multiple topics, testing needs to take into account multiple topics, etc. Not sure if the conceptual cleanliness is worth it.

@ewencp
Copy link
Contributor Author

ewencp commented Oct 12, 2015

@gwenshap One other thing I thought of re: how the data is managed. Technically we don't need a root key. We could actually handle each connector's tasks independently, e.g. after writing the updated configs, just have a connector-tasks-commit type record. So if we had a connector called foo, instead of rewriting the root record, we could just write connector-tasks-foo that just contains the # of tasks that should now be used.

I think it's a bit messier to keep track of and you don't have one place you can look to see what the current state of tasks should be, but it's a bit simpler to understand and decouples certain updates (where we need to make sure we have consistent configs for all connector tasks). Thoughts? I don't think the code would need to change that much, so I'm open to either approach.

* limitations under the License.
**/

package org.apache.kafka.copycat.runtime.distributed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be under o.a.k.cc.runtime.storage?

@gwenshap
Copy link
Contributor

I like the idea of "commit" per connector-task configs vs. global.

@ewencp
Copy link
Contributor Author

ewencp commented Oct 13, 2015

@gwenshap I think I've updated with all the recommended changes, including the change from "root" config -> per-connector task commit messages. Turned out not to be that big a change.

Also, system tests for copycat are still looking good:

SESSION REPORT (ALL TESTS)
session_id: 2015-10-12--002
run time:   5 minutes 44.996 seconds
tests run:  4
passed:     4
failed:     0
===========================================================================================================================================================================================================================================================================================================================
test_id:    2015-10-12--002.kafkatest.tests.copycat_distributed_test.CopycatDistributedFileTest.test_file_source_and_sink
status:     PASS
run time:   1 minute 22.696 seconds
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_id:    2015-10-12--002.kafkatest.tests.copycat_test.CopycatStandaloneFileTest.test_file_source_and_sink.converter=org.apache.kafka.copycat.json.JsonConverter.schemas=True
status:     PASS
run time:   1 minute 28.027 seconds
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_id:    2015-10-12--002.kafkatest.tests.copycat_test.CopycatStandaloneFileTest.test_file_source_and_sink.converter=org.apache.kafka.copycat.json.JsonConverter.schemas=False
status:     PASS
run time:   1 minute 27.506 seconds
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_id:    2015-10-12--002.kafkatest.tests.copycat_test.CopycatStandaloneFileTest.test_file_source_and_sink.converter=org.apache.kafka.copycat.storage.StringConverter.schemas=None
status:     PASS
run time:   1 minute 26.759 seconds
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

@gwenshap
Copy link
Contributor

I really like the new revision :)

I had a minor API comment, but this can be done in a follow up JIRA (or simply when / if we will need the API)

LGTM!

@asfgit asfgit closed this in 36d4469 Oct 13, 2015
C0urante pushed a commit to C0urante/kafka that referenced this pull request Dec 3, 2019
omkreddy added a commit to omkreddy/kafka that referenced this pull request Jan 7, 2020
soondenana pushed a commit to soondenana/kafka that referenced this pull request Jan 22, 2020
wyuka added a commit to wyuka/kafka that referenced this pull request Jan 25, 2022
* [LI-HOTFIX] Add safety check to controlled shutdown (apache#103)

The controller will not allow a broker to initiate controlled shutdown
unless the shutdown is unlikely to lead to a topic-partition ISR
shrinking below the min.insync.replicas setting for that topic.

This new functionality is optional and disabled by default. The
following new configs have been added:

* controlled.shutdown.safety.check.enable: Enable this safety check
  for any ControlledShutdownRequest handled by the controller.
* controlled.shutdown.safety.check.redundancy.factor: Additional
  replicas above min.insync.replicas that are required for any
  topic-partition to allow shutdown of a broker. Default is 2, so if a
  topic has min.insync.replicas=2, then the ISR must have 4 replicas
  to allow shutdown.

By default a broker attempting shutdown will only send
ControlledShutdownRequest to the controller 3 times with a 5 second
sleep in between. If it is desired to never skip controlled shutdown,
controlled.shutdown.max.retries can be increased to a much larger
value than 3. It can be set to 2147483647 (Integer.MAX_VALUE) to
effectively retry forever.

A controller integration test has been added which exercises this code
with a simulated three node cluster with a topic configured with
min.insync.replicas=2, allowing only one broker to shut down at a
time.

In ControllerContext, instead of maintaining a list of broker ids that
have begun shutting down called shuttingDownBrokers, a map of brokerId
-> brokerEpoch. This will make it easier to keep this map and broker
presence notifications (via zookeeper) in sync.

When a ControlledShutdown request is allowed to begin, it is recorded
in zookeeper under /brokers/shutdown/<broker-id>, with the value of
the broker epoch. On controller initialization this is loaded into
controllerContext.shuttingDownBrokers. When a broker has restarted, it
will come back with a new broker epoch which will take precedence over
the epoch in controllerContext.shuttingDownBrokers.

The controller now needs to know the min.insync.replicas of every
topic. This is done by a combination of DynamicConfigManager,
a full scan of all topic configs on controller failover, and a /topics
znode watcher. All update a map in ControllerContext.

In order to allow skipping the safety check, this patch adds
LiControlledShutdownSkipSafetyCheckRequest. It takes a brokerId
and brokerEpoch. When sent to the controller, the controller
will record it in a map in memory.

After handling that request, the controller will ignore the results of
the safety check for any request with that brokerId and epoch <= the
epoch provided via LiControlledShutdownSkipSafetyCheckRequest.

This is not persisted to zookeeper since this is an emergency
action. If the controller moves then an admin can just send the
request again.

This request is authorized as a cluster action.

Since LiControlledShutdownSkipSafetyCheck is not submitted to
upstream, it has been added with an ApiKey of 1000. The largest ApiKey
in upstream is 47. This new API is only meant to be used by
AdminClient and is not meant to be an inter-broker request. So this
sets the precedent that if we have an admin-only API we want to add
that isn't upstream yet, we can give it an ApiKey of 1000+. Then if
this internal-only admin API does make it upstream, migrating our
tools to it will be easy since usage should be minimal.

This required fixing a couple of tests that made assumptions about
ApiKeys, namely that the array of api keys now has gaps in it. Now
ApiKeys#hasKeys returns true if the key is within range *and* the
entry in the ID_TO_TYPE array is not null.

* Fix build.

* Fix integ test.

* [LI-HOTFIX] Skip shutdown safety check at epoch already checked (apache#157)

TICKET = N/A
LI_DESCRIPTION =
This makes a couple of changes to address some deficiencies
discovered with this safety check when using it in a large, busy
cluster.

* Skip the check if already approved for the requested epoch
* Define redundancy factor property as the required result of
  a broker shutdown, not the required state before shutdown,
  allowing the shutdown of lagging replicas.
* Change default redundancy factor from 2 => 1

In c337e47 (PR apache#103) a shutdown safety check was introduced. In
testing this in a large cluster, we found a problem. If the controller
sees that brokerId 1 is safe to shut down at brokerEpoch 99, it will
add that broker to `controllerContext.shuttingDownBrokerIds`. But it
might still not allow shutdown because some leadership transitions
must happen first.

In the process of performing those leadership transitions, brokerId 1
may receive StopReplica, which would cause it to leave the
ISR. Subsequent ControlledShutdownRequest attempts would then fail the
`safeToShutdown` safety check, so the broker would never be allowed to
shut down.

This patch adjusts the implementation of `safeToShutdown`
slightly. Now, if the controller has said that a broker is safe to
shut down at a given epoch, it will always allow that shutdown for
that epoch without performing any additional checks. This allows the
full controlled shutdown process to complete which in a large, busy
cluster may take several rounds of `ControlledShutdownRequest`.

In addition, Lucas pointed out that if a broker which is attempting to
shutdown is already out of ISR, then it is probably pretty harmless to
allow it to shut down if the safety check would have allowed it to when
there are no under-replicated partitions. So we changed the logic a
little further.

ControlledShutdownSafetyCheckEnableProp is now defined as,
 "Shutdown will be allowed if removing the candidate broker will leave
min.insync.replicas + ControlledShutdownSafetyCheckEnableProp replicas
in the ISR."

The default redundancy factor is changed to 1.

EXIT_CRITERIA = N/A

* [LI-HOTFIX] Upgrade log level to INFO to show which partition is blocking the controlled shutdown (apache#164)

TICKET = N/A
LI_DESCRIPTION = 
The partition info is quite useful and we want to show it by default.
Thus I'm upgrading the log level to INFO.
EXIT_CRITERIA = N/A

* [LI-HOTFIX] Topics being deleted shouldn't block the controlled shutdown (apache#181)

TICKET = LIKAFKA-36940
LI_DESCRIPTION = In clusters with ongoing topic deletions, we found that the controlled shutdown can
be blocked by topics that are being deleted. This PR skips the check for topics being deleted so
that they don't block the shutdown.

EXIT_CRITERIA = Never

* [LI-HOTFIX] Ensure that the controller has fresh values for min.insync.replicas after topic-level configs are dropped. (apache#207)

TICKET = CM-79574
LI_DESCRIPTION =
The following series of events causes the Controller to keep stale config value for min.insync.replicas config.
This can cause a broker shutdown to be blocked when ISR-based shutdown logic is adopted.

1. Given a topic T, set its topic-level min.insync.replicas config value to XXX.
2. Then remove the topic-level min.insync.replicas config value of topic T

Currently, step-1 will cause the controller update its "controllerContext.topicMinIsrConfig" value for the topic T to XXX.
However, step-2 will not remove the config value XXX from "controllerContext.topicMinIsrConfig". As a result, Controller
will cache a stale config value of XXX for topic T. The expected behavior was for the topic to use the default config value
after its topic-level config was removed.

* The above events can cause the ISR-based shutdown logic to block shutdown. For example,
	1. Set the min.insync.replicas=2 for topic T.
	2. Remvoe topic-level min.insync.replicas config value for topic T.
	3. Use redundancy factor of 1 for ISR-based shutdown logic.
	4. Observe that controller will block shutdown for a partition of the topic with the following log:

	```
	T-2 has min.insync.replicas=2 and a redundancy factor of 1. Removing broker XXX will leave 2 live replicas in the ISR.
	```

EXIT_CRITERIA = When this change is merged in upstream and the corresponding change is pulled into a future release.

* Disable test where shutdown is Kafka shutdown is attempted when ZK is unavailable.

* Remove incorrect filtering logic in ControllerContext::removeLiveBrokers

Co-authored-by: Kyle Ambroff-Kao <kyle@ambroffkao.com>
Co-authored-by: Lucas Wang <luwang@linkedin.com>
Co-authored-by: Adem Efe Gencer <agencer@linkedin.com>
wyuka added a commit to wyuka/kafka that referenced this pull request Mar 4, 2022
* [LI-HOTFIX] Add safety check to controlled shutdown (apache#103)

The controller will not allow a broker to initiate controlled shutdown
unless the shutdown is unlikely to lead to a topic-partition ISR
shrinking below the min.insync.replicas setting for that topic.

This new functionality is optional and disabled by default. The
following new configs have been added:

* controlled.shutdown.safety.check.enable: Enable this safety check
  for any ControlledShutdownRequest handled by the controller.
* controlled.shutdown.safety.check.redundancy.factor: Additional
  replicas above min.insync.replicas that are required for any
  topic-partition to allow shutdown of a broker. Default is 2, so if a
  topic has min.insync.replicas=2, then the ISR must have 4 replicas
  to allow shutdown.

By default a broker attempting shutdown will only send
ControlledShutdownRequest to the controller 3 times with a 5 second
sleep in between. If it is desired to never skip controlled shutdown,
controlled.shutdown.max.retries can be increased to a much larger
value than 3. It can be set to 2147483647 (Integer.MAX_VALUE) to
effectively retry forever.

A controller integration test has been added which exercises this code
with a simulated three node cluster with a topic configured with
min.insync.replicas=2, allowing only one broker to shut down at a
time.

In ControllerContext, instead of maintaining a list of broker ids that
have begun shutting down called shuttingDownBrokers, a map of brokerId
-> brokerEpoch. This will make it easier to keep this map and broker
presence notifications (via zookeeper) in sync.

When a ControlledShutdown request is allowed to begin, it is recorded
in zookeeper under /brokers/shutdown/<broker-id>, with the value of
the broker epoch. On controller initialization this is loaded into
controllerContext.shuttingDownBrokers. When a broker has restarted, it
will come back with a new broker epoch which will take precedence over
the epoch in controllerContext.shuttingDownBrokers.

The controller now needs to know the min.insync.replicas of every
topic. This is done by a combination of DynamicConfigManager,
a full scan of all topic configs on controller failover, and a /topics
znode watcher. All update a map in ControllerContext.

In order to allow skipping the safety check, this patch adds
LiControlledShutdownSkipSafetyCheckRequest. It takes a brokerId
and brokerEpoch. When sent to the controller, the controller
will record it in a map in memory.

After handling that request, the controller will ignore the results of
the safety check for any request with that brokerId and epoch <= the
epoch provided via LiControlledShutdownSkipSafetyCheckRequest.

This is not persisted to zookeeper since this is an emergency
action. If the controller moves then an admin can just send the
request again.

This request is authorized as a cluster action.

Since LiControlledShutdownSkipSafetyCheck is not submitted to
upstream, it has been added with an ApiKey of 1000. The largest ApiKey
in upstream is 47. This new API is only meant to be used by
AdminClient and is not meant to be an inter-broker request. So this
sets the precedent that if we have an admin-only API we want to add
that isn't upstream yet, we can give it an ApiKey of 1000+. Then if
this internal-only admin API does make it upstream, migrating our
tools to it will be easy since usage should be minimal.

This required fixing a couple of tests that made assumptions about
ApiKeys, namely that the array of api keys now has gaps in it. Now
ApiKeys#hasKeys returns true if the key is within range *and* the
entry in the ID_TO_TYPE array is not null.

* Fix build.

* Fix integ test.

* [LI-HOTFIX] Skip shutdown safety check at epoch already checked (apache#157)

TICKET = N/A
LI_DESCRIPTION =
This makes a couple of changes to address some deficiencies
discovered with this safety check when using it in a large, busy
cluster.

* Skip the check if already approved for the requested epoch
* Define redundancy factor property as the required result of
  a broker shutdown, not the required state before shutdown,
  allowing the shutdown of lagging replicas.
* Change default redundancy factor from 2 => 1

In c337e47 (PR apache#103) a shutdown safety check was introduced. In
testing this in a large cluster, we found a problem. If the controller
sees that brokerId 1 is safe to shut down at brokerEpoch 99, it will
add that broker to `controllerContext.shuttingDownBrokerIds`. But it
might still not allow shutdown because some leadership transitions
must happen first.

In the process of performing those leadership transitions, brokerId 1
may receive StopReplica, which would cause it to leave the
ISR. Subsequent ControlledShutdownRequest attempts would then fail the
`safeToShutdown` safety check, so the broker would never be allowed to
shut down.

This patch adjusts the implementation of `safeToShutdown`
slightly. Now, if the controller has said that a broker is safe to
shut down at a given epoch, it will always allow that shutdown for
that epoch without performing any additional checks. This allows the
full controlled shutdown process to complete which in a large, busy
cluster may take several rounds of `ControlledShutdownRequest`.

In addition, Lucas pointed out that if a broker which is attempting to
shutdown is already out of ISR, then it is probably pretty harmless to
allow it to shut down if the safety check would have allowed it to when
there are no under-replicated partitions. So we changed the logic a
little further.

ControlledShutdownSafetyCheckEnableProp is now defined as,
 "Shutdown will be allowed if removing the candidate broker will leave
min.insync.replicas + ControlledShutdownSafetyCheckEnableProp replicas
in the ISR."

The default redundancy factor is changed to 1.

EXIT_CRITERIA = N/A

* [LI-HOTFIX] Upgrade log level to INFO to show which partition is blocking the controlled shutdown (apache#164)

TICKET = N/A
LI_DESCRIPTION = 
The partition info is quite useful and we want to show it by default.
Thus I'm upgrading the log level to INFO.
EXIT_CRITERIA = N/A

* [LI-HOTFIX] Topics being deleted shouldn't block the controlled shutdown (apache#181)

TICKET = LIKAFKA-36940
LI_DESCRIPTION = In clusters with ongoing topic deletions, we found that the controlled shutdown can
be blocked by topics that are being deleted. This PR skips the check for topics being deleted so
that they don't block the shutdown.

EXIT_CRITERIA = Never

* [LI-HOTFIX] Ensure that the controller has fresh values for min.insync.replicas after topic-level configs are dropped. (apache#207)

TICKET = CM-79574
LI_DESCRIPTION =
The following series of events causes the Controller to keep stale config value for min.insync.replicas config.
This can cause a broker shutdown to be blocked when ISR-based shutdown logic is adopted.

1. Given a topic T, set its topic-level min.insync.replicas config value to XXX.
2. Then remove the topic-level min.insync.replicas config value of topic T

Currently, step-1 will cause the controller update its "controllerContext.topicMinIsrConfig" value for the topic T to XXX.
However, step-2 will not remove the config value XXX from "controllerContext.topicMinIsrConfig". As a result, Controller
will cache a stale config value of XXX for topic T. The expected behavior was for the topic to use the default config value
after its topic-level config was removed.

* The above events can cause the ISR-based shutdown logic to block shutdown. For example,
	1. Set the min.insync.replicas=2 for topic T.
	2. Remvoe topic-level min.insync.replicas config value for topic T.
	3. Use redundancy factor of 1 for ISR-based shutdown logic.
	4. Observe that controller will block shutdown for a partition of the topic with the following log:

	```
	T-2 has min.insync.replicas=2 and a redundancy factor of 1. Removing broker XXX will leave 2 live replicas in the ISR.
	```

EXIT_CRITERIA = When this change is merged in upstream and the corresponding change is pulled into a future release.

* Disable test where shutdown is Kafka shutdown is attempted when ZK is unavailable.

* Remove incorrect filtering logic in ControllerContext::removeLiveBrokers

Co-authored-by: Kyle Ambroff-Kao <kyle@ambroffkao.com>
Co-authored-by: Lucas Wang <luwang@linkedin.com>
Co-authored-by: Adem Efe Gencer <agencer@linkedin.com>
wyuka added a commit to wyuka/kafka that referenced this pull request Mar 28, 2022
* [LI-HOTFIX] Add safety check to controlled shutdown (apache#103)

The controller will not allow a broker to initiate controlled shutdown
unless the shutdown is unlikely to lead to a topic-partition ISR
shrinking below the min.insync.replicas setting for that topic.

This new functionality is optional and disabled by default. The
following new configs have been added:

* controlled.shutdown.safety.check.enable: Enable this safety check
  for any ControlledShutdownRequest handled by the controller.
* controlled.shutdown.safety.check.redundancy.factor: Additional
  replicas above min.insync.replicas that are required for any
  topic-partition to allow shutdown of a broker. Default is 2, so if a
  topic has min.insync.replicas=2, then the ISR must have 4 replicas
  to allow shutdown.

By default a broker attempting shutdown will only send
ControlledShutdownRequest to the controller 3 times with a 5 second
sleep in between. If it is desired to never skip controlled shutdown,
controlled.shutdown.max.retries can be increased to a much larger
value than 3. It can be set to 2147483647 (Integer.MAX_VALUE) to
effectively retry forever.

A controller integration test has been added which exercises this code
with a simulated three node cluster with a topic configured with
min.insync.replicas=2, allowing only one broker to shut down at a
time.

In ControllerContext, instead of maintaining a list of broker ids that
have begun shutting down called shuttingDownBrokers, a map of brokerId
-> brokerEpoch. This will make it easier to keep this map and broker
presence notifications (via zookeeper) in sync.

When a ControlledShutdown request is allowed to begin, it is recorded
in zookeeper under /brokers/shutdown/<broker-id>, with the value of
the broker epoch. On controller initialization this is loaded into
controllerContext.shuttingDownBrokers. When a broker has restarted, it
will come back with a new broker epoch which will take precedence over
the epoch in controllerContext.shuttingDownBrokers.

The controller now needs to know the min.insync.replicas of every
topic. This is done by a combination of DynamicConfigManager,
a full scan of all topic configs on controller failover, and a /topics
znode watcher. All update a map in ControllerContext.

In order to allow skipping the safety check, this patch adds
LiControlledShutdownSkipSafetyCheckRequest. It takes a brokerId
and brokerEpoch. When sent to the controller, the controller
will record it in a map in memory.

After handling that request, the controller will ignore the results of
the safety check for any request with that brokerId and epoch <= the
epoch provided via LiControlledShutdownSkipSafetyCheckRequest.

This is not persisted to zookeeper since this is an emergency
action. If the controller moves then an admin can just send the
request again.

This request is authorized as a cluster action.

Since LiControlledShutdownSkipSafetyCheck is not submitted to
upstream, it has been added with an ApiKey of 1000. The largest ApiKey
in upstream is 47. This new API is only meant to be used by
AdminClient and is not meant to be an inter-broker request. So this
sets the precedent that if we have an admin-only API we want to add
that isn't upstream yet, we can give it an ApiKey of 1000+. Then if
this internal-only admin API does make it upstream, migrating our
tools to it will be easy since usage should be minimal.

This required fixing a couple of tests that made assumptions about
ApiKeys, namely that the array of api keys now has gaps in it. Now
ApiKeys#hasKeys returns true if the key is within range *and* the
entry in the ID_TO_TYPE array is not null.

* Fix build.

* Fix integ test.

* [LI-HOTFIX] Skip shutdown safety check at epoch already checked (apache#157)

TICKET = N/A
LI_DESCRIPTION =
This makes a couple of changes to address some deficiencies
discovered with this safety check when using it in a large, busy
cluster.

* Skip the check if already approved for the requested epoch
* Define redundancy factor property as the required result of
  a broker shutdown, not the required state before shutdown,
  allowing the shutdown of lagging replicas.
* Change default redundancy factor from 2 => 1

In c337e47 (PR apache#103) a shutdown safety check was introduced. In
testing this in a large cluster, we found a problem. If the controller
sees that brokerId 1 is safe to shut down at brokerEpoch 99, it will
add that broker to `controllerContext.shuttingDownBrokerIds`. But it
might still not allow shutdown because some leadership transitions
must happen first.

In the process of performing those leadership transitions, brokerId 1
may receive StopReplica, which would cause it to leave the
ISR. Subsequent ControlledShutdownRequest attempts would then fail the
`safeToShutdown` safety check, so the broker would never be allowed to
shut down.

This patch adjusts the implementation of `safeToShutdown`
slightly. Now, if the controller has said that a broker is safe to
shut down at a given epoch, it will always allow that shutdown for
that epoch without performing any additional checks. This allows the
full controlled shutdown process to complete which in a large, busy
cluster may take several rounds of `ControlledShutdownRequest`.

In addition, Lucas pointed out that if a broker which is attempting to
shutdown is already out of ISR, then it is probably pretty harmless to
allow it to shut down if the safety check would have allowed it to when
there are no under-replicated partitions. So we changed the logic a
little further.

ControlledShutdownSafetyCheckEnableProp is now defined as,
 "Shutdown will be allowed if removing the candidate broker will leave
min.insync.replicas + ControlledShutdownSafetyCheckEnableProp replicas
in the ISR."

The default redundancy factor is changed to 1.

EXIT_CRITERIA = N/A

* [LI-HOTFIX] Upgrade log level to INFO to show which partition is blocking the controlled shutdown (apache#164)

TICKET = N/A
LI_DESCRIPTION =
The partition info is quite useful and we want to show it by default.
Thus I'm upgrading the log level to INFO.
EXIT_CRITERIA = N/A

* [LI-HOTFIX] Topics being deleted shouldn't block the controlled shutdown (apache#181)

TICKET = LIKAFKA-36940
LI_DESCRIPTION = In clusters with ongoing topic deletions, we found that the controlled shutdown can
be blocked by topics that are being deleted. This PR skips the check for topics being deleted so
that they don't block the shutdown.

EXIT_CRITERIA = Never

* [LI-HOTFIX] Ensure that the controller has fresh values for min.insync.replicas after topic-level configs are dropped. (apache#207)

TICKET = CM-79574
LI_DESCRIPTION =
The following series of events causes the Controller to keep stale config value for min.insync.replicas config.
This can cause a broker shutdown to be blocked when ISR-based shutdown logic is adopted.

1. Given a topic T, set its topic-level min.insync.replicas config value to XXX.
2. Then remove the topic-level min.insync.replicas config value of topic T

Currently, step-1 will cause the controller update its "controllerContext.topicMinIsrConfig" value for the topic T to XXX.
However, step-2 will not remove the config value XXX from "controllerContext.topicMinIsrConfig". As a result, Controller
will cache a stale config value of XXX for topic T. The expected behavior was for the topic to use the default config value
after its topic-level config was removed.

* The above events can cause the ISR-based shutdown logic to block shutdown. For example,
	1. Set the min.insync.replicas=2 for topic T.
	2. Remvoe topic-level min.insync.replicas config value for topic T.
	3. Use redundancy factor of 1 for ISR-based shutdown logic.
	4. Observe that controller will block shutdown for a partition of the topic with the following log:

	```
	T-2 has min.insync.replicas=2 and a redundancy factor of 1. Removing broker XXX will leave 2 live replicas in the ISR.
	```

EXIT_CRITERIA = When this change is merged in upstream and the corresponding change is pulled into a future release.

* Disable test where shutdown is Kafka shutdown is attempted when ZK is unavailable.

* Remove incorrect filtering logic in ControllerContext::removeLiveBrokers

Co-authored-by: Kyle Ambroff-Kao <kyle@ambroffkao.com>
Co-authored-by: Lucas Wang <luwang@linkedin.com>
Co-authored-by: Adem Efe Gencer <agencer@linkedin.com>

* [LI-HOTFIX] Improve the testControlledShutdownRejectRequestForAvailabilityRisk test (apache#141)

TICKET = N/A
LI_DESCRIPTION =
In the test testControlledShutdownRejectRequestForAvailabilityRisk,
sometimes the epoch for the shutting down broker becomes 50. This PR improves the assertions
so that the test doesn't fail when such a condition happens.

EXIT_CRITERIA = N/A
wyuka added a commit to wyuka/kafka that referenced this pull request Jun 16, 2022
* [LI-HOTFIX] Add safety check to controlled shutdown (apache#103)

The controller will not allow a broker to initiate controlled shutdown
unless the shutdown is unlikely to lead to a topic-partition ISR
shrinking below the min.insync.replicas setting for that topic.

This new functionality is optional and disabled by default. The
following new configs have been added:

* controlled.shutdown.safety.check.enable: Enable this safety check
  for any ControlledShutdownRequest handled by the controller.
* controlled.shutdown.safety.check.redundancy.factor: Additional
  replicas above min.insync.replicas that are required for any
  topic-partition to allow shutdown of a broker. Default is 2, so if a
  topic has min.insync.replicas=2, then the ISR must have 4 replicas
  to allow shutdown.

By default a broker attempting shutdown will only send
ControlledShutdownRequest to the controller 3 times with a 5 second
sleep in between. If it is desired to never skip controlled shutdown,
controlled.shutdown.max.retries can be increased to a much larger
value than 3. It can be set to 2147483647 (Integer.MAX_VALUE) to
effectively retry forever.

A controller integration test has been added which exercises this code
with a simulated three node cluster with a topic configured with
min.insync.replicas=2, allowing only one broker to shut down at a
time.

In ControllerContext, instead of maintaining a list of broker ids that
have begun shutting down called shuttingDownBrokers, a map of brokerId
-> brokerEpoch. This will make it easier to keep this map and broker
presence notifications (via zookeeper) in sync.

When a ControlledShutdown request is allowed to begin, it is recorded
in zookeeper under /brokers/shutdown/<broker-id>, with the value of
the broker epoch. On controller initialization this is loaded into
controllerContext.shuttingDownBrokers. When a broker has restarted, it
will come back with a new broker epoch which will take precedence over
the epoch in controllerContext.shuttingDownBrokers.

The controller now needs to know the min.insync.replicas of every
topic. This is done by a combination of DynamicConfigManager,
a full scan of all topic configs on controller failover, and a /topics
znode watcher. All update a map in ControllerContext.

In order to allow skipping the safety check, this patch adds
LiControlledShutdownSkipSafetyCheckRequest. It takes a brokerId
and brokerEpoch. When sent to the controller, the controller
will record it in a map in memory.

After handling that request, the controller will ignore the results of
the safety check for any request with that brokerId and epoch <= the
epoch provided via LiControlledShutdownSkipSafetyCheckRequest.

This is not persisted to zookeeper since this is an emergency
action. If the controller moves then an admin can just send the
request again.

This request is authorized as a cluster action.

Since LiControlledShutdownSkipSafetyCheck is not submitted to
upstream, it has been added with an ApiKey of 1000. The largest ApiKey
in upstream is 47. This new API is only meant to be used by
AdminClient and is not meant to be an inter-broker request. So this
sets the precedent that if we have an admin-only API we want to add
that isn't upstream yet, we can give it an ApiKey of 1000+. Then if
this internal-only admin API does make it upstream, migrating our
tools to it will be easy since usage should be minimal.

This required fixing a couple of tests that made assumptions about
ApiKeys, namely that the array of api keys now has gaps in it. Now
ApiKeys#hasKeys returns true if the key is within range *and* the
entry in the ID_TO_TYPE array is not null.

* Fix build.

* Fix integ test.

* [LI-HOTFIX] Skip shutdown safety check at epoch already checked (apache#157)

TICKET = N/A
LI_DESCRIPTION =
This makes a couple of changes to address some deficiencies
discovered with this safety check when using it in a large, busy
cluster.

* Skip the check if already approved for the requested epoch
* Define redundancy factor property as the required result of
  a broker shutdown, not the required state before shutdown,
  allowing the shutdown of lagging replicas.
* Change default redundancy factor from 2 => 1

In c337e47 (PR apache#103) a shutdown safety check was introduced. In
testing this in a large cluster, we found a problem. If the controller
sees that brokerId 1 is safe to shut down at brokerEpoch 99, it will
add that broker to `controllerContext.shuttingDownBrokerIds`. But it
might still not allow shutdown because some leadership transitions
must happen first.

In the process of performing those leadership transitions, brokerId 1
may receive StopReplica, which would cause it to leave the
ISR. Subsequent ControlledShutdownRequest attempts would then fail the
`safeToShutdown` safety check, so the broker would never be allowed to
shut down.

This patch adjusts the implementation of `safeToShutdown`
slightly. Now, if the controller has said that a broker is safe to
shut down at a given epoch, it will always allow that shutdown for
that epoch without performing any additional checks. This allows the
full controlled shutdown process to complete which in a large, busy
cluster may take several rounds of `ControlledShutdownRequest`.

In addition, Lucas pointed out that if a broker which is attempting to
shutdown is already out of ISR, then it is probably pretty harmless to
allow it to shut down if the safety check would have allowed it to when
there are no under-replicated partitions. So we changed the logic a
little further.

ControlledShutdownSafetyCheckEnableProp is now defined as,
 "Shutdown will be allowed if removing the candidate broker will leave
min.insync.replicas + ControlledShutdownSafetyCheckEnableProp replicas
in the ISR."

The default redundancy factor is changed to 1.

EXIT_CRITERIA = N/A

* [LI-HOTFIX] Upgrade log level to INFO to show which partition is blocking the controlled shutdown (apache#164)

TICKET = N/A
LI_DESCRIPTION =
The partition info is quite useful and we want to show it by default.
Thus I'm upgrading the log level to INFO.
EXIT_CRITERIA = N/A

* [LI-HOTFIX] Topics being deleted shouldn't block the controlled shutdown (apache#181)

TICKET = LIKAFKA-36940
LI_DESCRIPTION = In clusters with ongoing topic deletions, we found that the controlled shutdown can
be blocked by topics that are being deleted. This PR skips the check for topics being deleted so
that they don't block the shutdown.

EXIT_CRITERIA = Never

* [LI-HOTFIX] Ensure that the controller has fresh values for min.insync.replicas after topic-level configs are dropped. (apache#207)

TICKET = CM-79574
LI_DESCRIPTION =
The following series of events causes the Controller to keep stale config value for min.insync.replicas config.
This can cause a broker shutdown to be blocked when ISR-based shutdown logic is adopted.

1. Given a topic T, set its topic-level min.insync.replicas config value to XXX.
2. Then remove the topic-level min.insync.replicas config value of topic T

Currently, step-1 will cause the controller update its "controllerContext.topicMinIsrConfig" value for the topic T to XXX.
However, step-2 will not remove the config value XXX from "controllerContext.topicMinIsrConfig". As a result, Controller
will cache a stale config value of XXX for topic T. The expected behavior was for the topic to use the default config value
after its topic-level config was removed.

* The above events can cause the ISR-based shutdown logic to block shutdown. For example,
	1. Set the min.insync.replicas=2 for topic T.
	2. Remvoe topic-level min.insync.replicas config value for topic T.
	3. Use redundancy factor of 1 for ISR-based shutdown logic.
	4. Observe that controller will block shutdown for a partition of the topic with the following log:

	```
	T-2 has min.insync.replicas=2 and a redundancy factor of 1. Removing broker XXX will leave 2 live replicas in the ISR.
	```

EXIT_CRITERIA = When this change is merged in upstream and the corresponding change is pulled into a future release.

* Disable test where shutdown is Kafka shutdown is attempted when ZK is unavailable.

* Remove incorrect filtering logic in ControllerContext::removeLiveBrokers

Co-authored-by: Kyle Ambroff-Kao <kyle@ambroffkao.com>
Co-authored-by: Lucas Wang <luwang@linkedin.com>
Co-authored-by: Adem Efe Gencer <agencer@linkedin.com>

* [LI-HOTFIX] Improve the testControlledShutdownRejectRequestForAvailabilityRisk test (apache#141)

TICKET = N/A
LI_DESCRIPTION =
In the test testControlledShutdownRejectRequestForAvailabilityRisk,
sometimes the epoch for the shutting down broker becomes 50. This PR improves the assertions
so that the test doesn't fail when such a condition happens.

EXIT_CRITERIA = N/A
udaynpusa pushed a commit to mapr/kafka that referenced this pull request Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants