Skip to content

Commit

Permalink
CONFLUENT: Sync from apache/kafka/trunk (19 March 2021)
Browse files Browse the repository at this point in the history
Conflicts:
* build.gradle: keep `dependencySubstitution` Confluent addition in
`resolutionStrategy` and take upstream changes.

Commits:
* apache-github/trunk:
  KAFKA-12503: inform threads to resize their cache instead of doing so for them (apache#10356)
  KAFKA-10697: Remove ProduceResponse.responses (apache#10332)
  MINOR: Exclude KIP-500.md from rat check (apache#10354)
  MINOR: Move `configurations.all` to be a child of `allprojects` (apache#10349)
  MINOR: Remove use of `NoSuchElementException` in `KafkaMetadataLog` (apache#10344)
  MINOR: Start the broker-to-controller channel for request forwarding (apache#10340)
  KAFKA-12382: add a README for KIP-500 (apache#10227)
  MINOR: Fix BaseHashTable sizing (apache#10334)
  KAFKA-10357: Add setup method to internal topics (apache#10317)
  MINOR: remove redundant null check when testing specified type (apache#10314)
  KAFKA-12293: Remove JCenter from buildscript and delete buildscript.gradle
  KAFKA-12491: Make rocksdb an `api` dependency for `streams` (apache#10341)
  KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not  exist in current kafka cluster (apache#10304)
  KAFKA-12459; Use property testing library for raft event simulation tests (apache#10323)
  MINOR: fix failing ZooKeeper system tests (apache#10297)
  MINOR: fix client_compatibility_features_test.py (apache#10292)
  • Loading branch information
ijuma committed Mar 19, 2021
2 parents bea34ef + 367eca0 commit d38bc38
Show file tree
Hide file tree
Showing 42 changed files with 1,632 additions and 705 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -57,5 +57,6 @@ jmh-benchmarks/generated
jmh-benchmarks/src/main/generated
streams/src/generated
raft/src/generated
raft/.jqwik-database
core/src/generated
metadata/src/generated
160 changes: 160 additions & 0 deletions KIP-500.md
@@ -0,0 +1,160 @@
KIP-500 Early Access Release
============================

# Introduction
It is now possible to run Apache Kafka without Apache ZooKeeper! We call this mode [self-managed mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum). It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it is available for testing in the Kafka 2.8 release.

When the Kafka cluster is in self-managed mode, it does not store its metadata in ZooKeeper. In fact, you do not have to run ZooKeeper at all, because it stores its metadata in a Raft quorum of controller nodes.

Self-managed mode has many benefits -- some obvious, and some not so obvious. Clearly, it is nice to manage and configure one service rather than two services. In addition, you can now run a single process Kafka cluster. Most important of all, self-managed mode is more scalable. We expect to be able to [support many more topics and partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/) in this mode.

# Quickstart

## Warning
Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.0 is released, it may not even be possible to upgrade your self-managed clusters from 2.8 to 3.0. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the early access release of KIP-500.

## Generate a cluster ID
The first step is to generate an ID for your new cluster, using the kafka-storage tool:

~~~~
$ ./bin/kafka-storage.sh random-uuid
xtzWWN4bTjitpL3kfd9s5g
~~~~

## Format Storage Directories
The next step is to format your storage directories. If you are running in single-node mode, you can do this with one command:

~~~~
$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c ./config/nozk-combined.properties
Formatting /tmp/nozk-combined-logs
~~~~

If you are using multiple nodes, then you should run the format command on each node. Be sure to use the same cluster ID for each one.

## Start the Kafka Server
Finally, you are ready to start the Kafka server on each node.

~~~~
$ ./bin/kafka-server-start.sh ./config/nozk-combined.properties
[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-02-26 15:37:11,294] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, dir=/tmp/nozk-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) (org.apache.kafka.raft.QuorumState)
...
~~~~

Just like with a ZooKeeper based broker, you can connect to port 9092 (or whatever port you configured) to perform administrative operations or produce or consume data.

~~~~
$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
Created topic foo.
~~~~

# Deployment

## Controller Servers
Unlike in ZooKeeper-based mode, where any server can become the controller, in self-managed mode, only a small group of specially selected servers can act as controllers. The specially selected controller servers will participate in the metadata quorum. Each KIP-500 controller server is either active, or a hot standby for the current active controller server.

Typically you will select either 3 or 5 servers for this role, depending on the size of your cluster. Just like with ZooKeeper, you must keep a majority of the controllers alive in order to maintain availability. So if you have 3 controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 failures.

## Process Roles
Each Kafka server now has a new configuration key called `process.roles` which can have the following values:

* If `process.roles` is set to `broker`, the server acts as a self-managed broker.
* If `process.roles` is set to `controller`, the server acts as a self-managed controller.
* If `process.roles` is set to `broker,controller`, the server acts as both a self-managed broker and a self-managd controller.
* If `process.roles` is not set at all then we are assumed to be in ZooKeeper mode. As mentioned earlier, you can't yet transition back and forth between ZK mode and self-managed mode without reformatting.

Nodes that act as both brokers and controllers are referred to as "combined" nodes. The advantage of using combined nodes you will have uses fewer Java Virtual Machines (JVMs). This will allow you to avoid some of the fixed memory overheads associated with JVMs. The disdavantage is that the controller will be less isolated from the rest of the system. For example, if activity on the broker causes an out of memory condition, the controller part of the server is not isolated from that OOM condition.

## Quorum Voters
All nodes in the system must set the `controller.quorum.voters` configuration. This identifies the quorum controller servers that should be used. All the controllers must be enumerated. This is similar to how, when using ZooKeeper, the `zookeeper.connect` configuration must contain all the ZooKeeper servers. Unlike with the ZK config, however, `controller.quorum.voters` also has IDs for each node. The format is id1@host1:port1,id2@host2:port2, etc.

So if you have 10 brokers and 3 controllers named controller1, controller2, controller3, you might have the following configuration on controller1:
```
process.roles=controller
node.id=1
listeners=CONTROLLER://controller1.example.com:9093
controller.quorum.voters=1@controller1.example.com:9093,2@controller2.example.com:9093,3@controller3.example.com:9093
```

Each broker and each controller must set `controller.quorum.voters`. Note that the node ID supplied in the controller.quorum.voters configuration must match that supplied to the server. So on controller1, node.id must be set to 1, and so forth. Note that there is no requirement for controller IDs to start at 0 or 1. However, the easiest and least confusing way to allocate node IDs is probably just to give each server a numeric ID, starting from 0.

Note that clients never need to configure `controller.quorum.voters`; only servers do.

## Kafka Storage Tool
As described above in the QuickStart section, you must use the kafka-storage.sh tool to generate a cluster ID for your new cluster, and then run the format command on each node before starting the node.

This is different from how Kafka has operated in the past. Previously, Kafka would format blank storage directories automatically, and also generate a new cluster UUID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. For example, under UNIX, if a data directory can't be mounted, it may show up as blank. In this case, auto-formatting would be the wrong thing to do.

This is particularly important for the metadata log maintained by the controller servers. If two controllers out of three controllers were able to start with blank logs, a leader might be able to be elected with nothing in the log, which would cause all metadata to be lost.

# Missing Features
We do not yet support generating or loading KIP-630 metadata snapshots. This means that after a while, the time required to restart a broker will become very large. This is a known issue and we are working on implementing snapshots for the next release.

We also don't support any kind of upgrade right now, either to or from self-managed mode. This is another big gap that we are working on.

Finally, the following Kafka features have not yet been fully implemented:

* Support for security (configuring an Authorizer, setting up SCRAM, delegation tokens, and so forth)
* Support for transactions and exactly-once semantics
* Support for adding partitions to existing topics
* Support for partition reassignment
* Support for some configurations, like enabling unclean leader election by default or dynamically changing broker endpoints
* Support for KIP-112 "JBOD" modes
* Support for KIP-631 controller metrics

We've tried to make it clear when a feature is not supported in the early access release, but you may encounter some rough edges.

# Debugging
If you encounter an issue, you might want to take a look at the metadata log.

## kafka-dump-log
One way to view the metadata log is with kafka-dump-log.sh tool, like so:

~~~~
[cmccabe@zeratul kafka3]$ ./bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadata --files /tmp/nozk-combined-logs/\@metadata-0/*.log
Dumping /tmp/nozk-combined-logs/@metadata-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: true position: 0 CreateTime: 1614382631640 size: 89 magic: 2 compresscodec: NONE crc: 1438115474 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 89 CreateTime: 1614382632329 size: 137 magic: 2 compresscodec: NONE crc: 1095855865 isvalid: true
payload: {"type":"REGISTER_BROKER_RECORD","version":0,"data":{"brokerId":1,"incarnationId":"P3UFsWoNR-erL9PK98YLsA","brokerEpoch":0,"endPoints":[{"name":"PLAINTEXT","host":"localhost","port":9092,"securityProtocol":0}],"features":[],"rack":null}}
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 226 CreateTime: 1614382632453 size: 83 magic: 2 compresscodec: NONE crc: 455187130 isvalid: true
payload: {"type":"UNFENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 309 CreateTime: 1614382634484 size: 83 magic: 2 compresscodec: NONE crc: 4055692847 isvalid: true
payload: {"type":"FENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: true position: 392 CreateTime: 1614382671857 size: 89 magic: 2 compresscodec: NONE crc: 1318571838 isvalid: true
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 481 CreateTime: 1614382672440 size: 137 magic: 2 compresscodec: NONE crc: 841144615 isvalid: true
payload: {"type":"REGISTER_BROKER_RECORD","version":0,"data":{"brokerId":1,"incarnationId":"RXRJu7cnScKRZOnWQGs86g","brokerEpoch":4,"endPoints":[{"name":"PLAINTEXT","host":"localhost","port":9092,"securityProtocol":0}],"features":[],"rack":null}}
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 618 CreateTime: 1614382672544 size: 83 magic: 2 compresscodec: NONE crc: 4155905922 isvalid: true
payload: {"type":"UNFENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":4}}
baseOffset: 7 lastOffset: 8 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 701 CreateTime: 1614382712158 size: 159 magic: 2 compresscodec: NONE crc: 3726758683 isvalid: true
payload: {"type":"TOPIC_RECORD","version":0,"data":{"name":"foo","topicId":"5zoAlv-xEh9xRANKXt1Lbg"}}
payload: {"type":"PARTITION_RECORD","version":0,"data":{"partitionId":0,"topicId":"5zoAlv-xEh9xRANKXt1Lbg","replicas":[1],"isr":[1],"removingReplicas":null,"addingReplicas":null,"leader":1,"leaderEpoch":0,"partitionEpoch":0}}
~~~~

## The Metadata Shell
Another tool for examining the metadata logs is the Kafka metadata shell. Just like the ZooKeeper shell, this allows you to inspect the metadata of the cluster.

~~~~
$ ./bin/kafka-metadata-shell.sh --snapshot /tmp/nozk-combined-logs/\@metadata-0/00000000000000000000.log
>> ls /
brokers local metadataQuorum topicIds topics
>> ls /topics
foo
>> cat /topics/foo/0/data
{
"partitionId" : 0,
"topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
"replicas" : [ 1 ],
"isr" : [ 1 ],
"removingReplicas" : null,
"addingReplicas" : null,
"leader" : 1,
"leaderEpoch" : 0,
"partitionEpoch" : 0
}
~~~~
4 changes: 4 additions & 0 deletions NOTICE
Expand Up @@ -6,3 +6,7 @@ The Apache Software Foundation (https://www.apache.org/).

This distribution has a binary dependency on jersey, which is available under the CDDL
License. The source code of jersey can be found at https://github.com/jersey/jersey/.

This distribution has a binary test dependency on jqwik, which is available under
the Eclipse Public License 2.0. The source code can be found at
https://github.com/jlink/jqwik.
61 changes: 35 additions & 26 deletions build.gradle
Expand Up @@ -20,9 +20,7 @@ import java.nio.charset.StandardCharsets
buildscript {
repositories {
mavenCentral()
jcenter()
}
apply from: file('gradle/buildscript.gradle'), to: buildscript
apply from: "$rootDir/gradle/dependencies.gradle"

dependencies {
Expand Down Expand Up @@ -90,31 +88,32 @@ allprojects {
}
}
}
configurations.all {
// zinc is the Scala incremental compiler, it has a configuration for its own dependencies
// that are unrelated to the project dependencies, we should not change them
if (name != "zinc") {
resolutionStrategy {
force(
// be explicit about the javassist dependency version instead of relying on the transitive version
libs.javassist,
// ensure we have a single version in the classpath despite transitive dependencies
libs.scalaLibrary,
libs.scalaReflect,
libs.jacksonAnnotations,
// be explicit about the Netty dependency version instead of relying on the version set by
// ZooKeeper (potentially older and containing CVEs)
libs.nettyHandler,
libs.nettyTransportNativeEpoll
)
dependencySubstitution {
substitute module("log4j:log4j:1.2.17") because "we use a custom version with security patches" with module("io.confluent:confluent-log4j:1.2.17-cp2")
}
}

configurations.all {
// zinc is the Scala incremental compiler, it has a configuration for its own dependencies
// that are unrelated to the project dependencies, we should not change them
if (name != "zinc") {
resolutionStrategy {
force(
// be explicit about the javassist dependency version instead of relying on the transitive version
libs.javassist,
// ensure we have a single version in the classpath despite transitive dependencies
libs.scalaLibrary,
libs.scalaReflect,
libs.jacksonAnnotations,
// be explicit about the Netty dependency version instead of relying on the version set by
// ZooKeeper (potentially older and containing CVEs)
libs.nettyHandler,
libs.nettyTransportNativeEpoll
)

dependencySubstitution {
substitute module("log4j:log4j:1.2.17") because "we use a custom version with security patches" with module("io.confluent:confluent-log4j:1.2.17-cp2")
}
}
}
}

}

ext {
Expand Down Expand Up @@ -173,6 +172,7 @@ if (file('.git').exists()) {
'gradlew',
'gradlew.bat',
'gradle/wrapper/gradle-wrapper.properties',
'KIP-500.md',
'TROGDOR.md',
'**/*README.md',
'**/id_rsa',
Expand Down Expand Up @@ -1279,6 +1279,7 @@ project(':raft') {
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.jqwik

testRuntimeOnly libs.slf4jlog4j
}
Expand Down Expand Up @@ -1332,6 +1333,12 @@ project(':raft') {
}
}

test {
useJUnitPlatform {
includeEngines 'jqwik', 'junit-jupiter'
}
}

clean.doFirst {
delete "$buildDir/kafka/"
}
Expand Down Expand Up @@ -1448,9 +1455,11 @@ project(':streams') {
// this transitive dependency is not used in Streams, and it breaks SBT builds
exclude module: 'javax.ws.rs-api'
}


// `org.rocksdb.Options` is part of Kafka Streams public api via `RocksDBConfigSetter`
api libs.rocksDBJni

implementation libs.slf4jApi
implementation libs.rocksDBJni
implementation libs.jacksonAnnotations
implementation libs.jacksonDatabind

Expand Down Expand Up @@ -1990,7 +1999,7 @@ project(':connect:transforms') {

dependencies {
api project(':connect:api')

implementation libs.slf4jApi

testImplementation libs.easymock
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Expand Up @@ -404,6 +404,7 @@
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
<allow pkg="net.jqwik"/>
</subpackage>

<subpackage name="snapshot">
Expand Down
Expand Up @@ -115,15 +115,15 @@ public static class TopicMetadataAndConfig {
private final int replicationFactor;
private final Config config;

TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config) {
public TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config) {
this.exception = null;
this.topicId = topicId;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.config = config;
}

TopicMetadataAndConfig(ApiException exception) {
public TopicMetadataAndConfig(ApiException exception) {
this.exception = exception;
this.topicId = Uuid.ZERO_UUID;
this.numPartitions = UNKNOWN;
Expand Down
Expand Up @@ -32,7 +32,7 @@
public class DeleteTopicsResult {
final Map<String, KafkaFuture<Void>> futures;

DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}

Expand Down

0 comments on commit d38bc38

Please sign in to comment.