Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-kafka/trunk' into ak-to-ccs-merg…
Browse files Browse the repository at this point in the history
…e-20230807

* apache-kafka/trunk: (81 commits)
  MINOR: update Kafka Streams state.dir doc (apache#14155)
  KAFKA-15189: only init remote topic metrics when enabled (apache#14133)
  MINOR: improve logging for FK-join (apache#14105)
  KAFKA-15107: Support custom metadata for remote log segment (apache#13984)
  KAFKA-10199: Change to RUNNING if no pending task to recycle exist (apache#14145)
  KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (apache#13920)
  KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (apache#14143)
  MINOR: Fix debug logs to display TimeIndexOffset (apache#13935)
  KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (apache#14114)
  KAFKA-15022: [5/N] compute rack aware assignment for standby tasks (apache#14108)
  ...
  • Loading branch information
badaiaqrandista committed Aug 6, 2023
2 parents 8591599 + 7a2e11c commit 0bf1735
Show file tree
Hide file tree
Showing 378 changed files with 24,815 additions and 4,739 deletions.
9 changes: 5 additions & 4 deletions LICENSE-binary
Expand Up @@ -251,17 +251,18 @@ netty-transport-classes-epoll-4.1.94.Final
netty-transport-native-epoll-4.1.94.Final
netty-transport-native-unix-common-4.1.94.Final
plexus-utils-3.3.0
reflections-0.10.2
reload4j-1.2.25
rocksdbjni-7.1.2
scala-collection-compat_2.13-2.10.0
scala-library-2.13.11
scala-logging_2.13-3.9.4
scala-reflect-2.13.11
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.1
snappy-java-1.1.10.3
swagger-annotations-2.2.8
zookeeper-3.6.4
zookeeper-jute-3.6.4
zookeeper-3.8.2
zookeeper-jute-3.8.2

===============================================================================
This product bundles various third-party components under other open source
Expand Down Expand Up @@ -329,4 +330,4 @@ paranamer-2.8, see: licenses/paranamer-BSD-3-clause
Do What The F*ck You Want To Public License
see: licenses/DWTFYWTPL

reflections-0.9.12
reflections-0.10.2
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -4,7 +4,7 @@ See our [web site](https://kafka.apache.org) for details on the project.

You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.

We build and test Apache Kafka with Java 8, 11 and 17. We set the `release` parameter in javac and scalac
We build and test Apache Kafka with Java 8, 11, 17 and 20. We set the `release` parameter in javac and scalac
to `8` to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version
used for compilation). Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache
Kafka 4.0 (see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) for more details).
Expand Down Expand Up @@ -288,4 +288,4 @@ See [vagrant/README.md](vagrant/README.md).
Apache Kafka is interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).

To contribute follow the instructions here:
* https://kafka.apache.org/contributing.html
* https://kafka.apache.org/contributing.html
2 changes: 1 addition & 1 deletion bin/kafka-delete-records.sh
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.DeleteRecordsCommand "$@"
2 changes: 1 addition & 1 deletion bin/kafka-replica-verification.sh
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ReplicaVerificationTool "$@"
2 changes: 1 addition & 1 deletion bin/windows/kafka-delete-records.bat
Expand Up @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

"%~dp0kafka-run-class.bat" kafka.admin.DeleteRecordsCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.DeleteRecordsCommand %*
2 changes: 1 addition & 1 deletion bin/windows/kafka-replica-verification.bat
Expand Up @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

"%~dp0kafka-run-class.bat" kafka.tools.ReplicaVerificationTool %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ReplicaVerificationTool %*
22 changes: 13 additions & 9 deletions build.gradle
Expand Up @@ -31,8 +31,9 @@ buildscript {
}

plugins {
id 'com.github.ben-manes.versions' version '0.46.0'
id 'com.github.ben-manes.versions' version '0.47.0'
id 'idea'
id 'jacoco'
id 'java-library'
id 'org.owasp.dependencycheck' version '8.2.1'
id 'org.nosphere.apache.rat' version "0.8.0"
Expand Down Expand Up @@ -773,9 +774,9 @@ subprojects {
dependsOn tasks.test
sourceSets sourceSets.main
reports {
html.enabled = true
xml.enabled = true
csv.enabled = false
html.required = true
xml.required = true
csv.required = false
}
}

Expand Down Expand Up @@ -866,10 +867,9 @@ if (userEnableTestCoverage) {
executionData.from = javaProjects.jacocoTestReport.executionData

reports {
html.enabled = true
xml.enabled = true
html.required = true
xml.required = true
}

// workaround to ignore projects that don't have any tests at all
onlyIf = { true }
doFirst {
Expand Down Expand Up @@ -951,6 +951,10 @@ project(':core') {
implementation libs.dropwizardMetrics
exclude module: 'slf4j-log4j12'
exclude module: 'log4j'
// Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to logback in v3.8.0, but Kafka relies on reload4j.
// We are removing Zookeeper's dependency on logback so we have a singular logging backend.
exclude module: 'logback-classic'
exclude module: 'logback-core'
}
// ZooKeeperMain depends on commons-cli but declares the dependency as `provided`
implementation libs.commonsCli
Expand Down Expand Up @@ -1692,6 +1696,8 @@ project(':storage:api') {

dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation libs.metrics
implementation libs.slf4jApi

testImplementation project(':clients')
Expand Down Expand Up @@ -2800,8 +2806,6 @@ project(':connect:runtime') {
api project(':connect:json')
api project(':connect:transforms')

implementation project(':tools')

implementation libs.slf4jApi
implementation libs.log4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
Expand Down
24 changes: 21 additions & 3 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -63,7 +63,6 @@
</subpackage>

<subpackage name="controller">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.acl" />
Expand All @@ -73,7 +72,6 @@
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
Expand All @@ -93,13 +91,17 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.mutable" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

<subpackage name="image">
Expand All @@ -122,6 +124,22 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="loader">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
<subpackage name="publisher">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>

<subpackage name="metadata">
Expand Down
12 changes: 6 additions & 6 deletions checkstyle/import-control.xml
Expand Up @@ -229,6 +229,7 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />
Expand Down Expand Up @@ -260,6 +261,10 @@
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
<subpackage name="storage">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

</subpackage>
Expand Down Expand Up @@ -378,10 +383,6 @@
<allow pkg="kafka.admin" />
</subpackage>

<subpackage name="tools">
<allow pkg="org.apache.kafka.tools" />
</subpackage>

<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
Expand Down Expand Up @@ -452,6 +453,7 @@
<subpackage name="sink">
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.connect.connector" />
<allow pkg="org.apache.kafka.connect.transforms" />
<allow pkg="org.apache.kafka.connect.storage" />
</subpackage>

Expand Down Expand Up @@ -565,7 +567,6 @@
<subpackage name="integration">
<allow pkg="org.apache.kafka.connect.util.clusters" />
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="javax.ws.rs" />
<allow pkg="org.apache.http"/>
<allow pkg="org.eclipse.jetty.util"/>
Expand All @@ -588,7 +589,6 @@

<subpackage name="tools">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>

Expand Down
19 changes: 11 additions & 8 deletions checkstyle/suppressions.xml
Expand Up @@ -39,7 +39,8 @@
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>

Expand Down Expand Up @@ -201,7 +202,7 @@
files="Murmur3.java"/>

<suppress checks="(NPathComplexity|CyclomaticComplexity)"
files="KStreamSlidingWindowAggregate.java"/>
files="(KStreamSlidingWindowAggregate|RackAwareTaskAssignor).java"/>

<!-- suppress FinalLocalVariable outside of the streams package. -->
<suppress checks="FinalLocalVariable"
Expand Down Expand Up @@ -266,7 +267,7 @@
<suppress checks="BooleanExpressionComplexity"
files="StreamsResetter.java"/>
<suppress checks="NPathComplexity"
files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier).java"/>
files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
Expand Down Expand Up @@ -322,14 +323,16 @@
<!-- group coordinator -->
<suppress checks="CyclomaticComplexity"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="MethodLength"
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="NPathComplexity"
files="(GroupMetadataManager).java"/>
<suppress checks="ClassFanOutComplexity"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember).java"/>
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest).java"/>
files="(RecordHelpersTest|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>

<!-- storage -->
<suppress checks="CyclomaticComplexity"
Expand Down
Expand Up @@ -158,10 +158,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
// generation amongst
for (final TopicPartition tp : memberData.partitions) {
if (allTopics.contains(tp.topic())) {
String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
String otherConsumer = allPreviousPartitionsToOwner.get(tp);
if (otherConsumer == null) {
// this partition is not owned by other consumer in the same generation
ownedPartitions.add(tp);
allPreviousPartitionsToOwner.put(tp, consumer);
} else {
final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);

Expand Down Expand Up @@ -1172,7 +1173,7 @@ private boolean isBalanced() {
if (!currentAssignment.get(consumer).contains(topicPartition)) {
String otherConsumer = allPartitions.get(topicPartition);
int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size();
if (consumerPartitionCount < otherConsumerPartitionCount) {
if (consumerPartitionCount + 1 < otherConsumerPartitionCount) {
log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.",
topicPartition, otherConsumer, consumer);
return false;
Expand Down
Expand Up @@ -92,7 +92,7 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
maybeAutoCommit();
maybeAutoCommit(this.subscriptionState.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
Expand All @@ -101,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
}

private void maybeAutoCommit() {
public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (!autoCommitState.isPresent()) {
return;
}
Expand All @@ -111,8 +111,7 @@ private void maybeAutoCommit() {
return;
}

Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptionState.allConsumed();
sendAutoCommit(allConsumedOffsets);
sendAutoCommit(offsets);
autocommit.resetTimer();
autocommit.setInflightCommitStatus(true);
}
Expand Down Expand Up @@ -215,7 +214,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationId(generation.generationId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
Expand Down
Expand Up @@ -1361,7 +1361,7 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationId(generation.generationId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
Expand Down

0 comments on commit 0bf1735

Please sign in to comment.