Skip to content

Commit

Permalink
Sync from apache/kafka (28 September 2019)
Browse files Browse the repository at this point in the history
Conflicts:
* .gitignore: addition of clients/src/generated-test was near
local additions for support-metrics.
* checkstyle/suppressions.xml: upstream refactoring of exclusions
for generator were near the local changes for support-metrics.
* gradle.properties: scala version bump caused a minor conflict
due to the kafka version change locally.
gradle/dependencies.gradle: bcpkix version bump was near avro
additions in the local version.

* apache-github/trunk: (49 commits)
  KAFKA-8471: Replace control requests/responses with automated protocol (apache#7353)
  MINOR: Don't generate unnecessary strings for debug logging in FetchSessionHandler (apache#7394)
  MINOR:fixed typo and removed outdated varilable name (apache#7402)
  KAFKA-8934: Create version file during build for Streams (apache#7397)
  KAFKA-8319: Make KafkaStreamsTest a non-integration test class (apache#7382)
  KAFKA-6883: Add toUpperCase support to sasl.kerberos.principal.to.local rule (KIP-309)
  KAFKA-8907; Return topic configs in CreateTopics response (KIP-525) (apache#7380)
  MINOR: Address review comments for KIP-504 authorizer changes (apache#7379)
  MINOR: add versioning to request and response headers (apache#7372)
  KAFKA-7273: Extend Connect Converter to support headers (apache#6362)
  MINOR: improve the Kafka RPC code generator (apache#7340)
  MINOR: Improve the org.apache.kafka.common.protocol code (apache#7344)
  KAFKA-8880: Docs on upgrade-guide (apache#7385)
  KAFKA-8179: do not suspend standby tasks during rebalance (apache#7321)
  KAFKA-8580: Compute RocksDB metrics (apache#7263)
  KAFKA-8880: Add overloaded function of Consumer.committed (apache#7304)
  HOTFIX: fix Kafka Streams upgrade note for broker backward compatibility (apache#7363)
  KAFKA-8848; Update system tests to use new AclAuthorizer (apache#7374)
  MINOR: remove unnecessary null check (apache#7299)
  KAFKA-6958: Overload methods for group and windowed stream to allow to name operation name using the new Named class (apache#6413)
  ...
  • Loading branch information
ijuma committed Sep 29, 2019
2 parents 80799be + 66183f7 commit 2715861
Show file tree
Hide file tree
Showing 321 changed files with 13,927 additions and 4,439 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -53,6 +53,7 @@ kafkatest.egg-info/
systest/
*.swp
clients/src/generated
clients/src/generated-test
support-metrics-client/src/main/generated
support-metrics-client/src/test/generated
support-metrics-common/src/main/generated
Expand Down
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Expand Up @@ -48,7 +48,7 @@ should_include_file() {
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.12.9
SCALA_VERSION=2.12.10
fi

if [ -z "$SCALA_BINARY_VERSION" ]; then
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.12.9
set SCALA_VERSION=2.12.10
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
63 changes: 61 additions & 2 deletions build.gradle
Expand Up @@ -1038,11 +1038,23 @@ project(':clients') {
task processMessages(type:JavaExec) {
main = "org.apache.kafka.message.MessageGenerator"
classpath = project(':generator').sourceSets.main.runtimeClasspath
args = [ "src/generated/java/org/apache/kafka/common/message", "src/main/resources/common/message" ]
args = [ "org.apache.kafka.common.message",
"src/generated/java/org/apache/kafka/common/message",
"src/main/resources/common/message" ]
inputs.dir("src/main/resources/common/message")
outputs.dir("src/generated/java/org/apache/kafka/common/message")
}

task processTestMessages(type:JavaExec) {
main = "org.apache.kafka.message.MessageGenerator"
classpath = project(':generator').sourceSets.main.runtimeClasspath
args = [ "org.apache.kafka.common.message",
"src/generated-test/java/org/apache/kafka/common/message",
"src/test/resources/common/message" ]
inputs.dir("src/test/resources/common/message")
outputs.dir("src/generated-test/java/org/apache/kafka/common/message")
}

sourceSets {
main {
java {
Expand All @@ -1051,13 +1063,15 @@ project(':clients') {
}
test {
java {
srcDirs = ["src/generated/java", "src/test/java"]
srcDirs = ["src/generated/java", "src/generated-test/java", "src/test/java"]
}
}
}

compileJava.dependsOn 'processMessages'

compileTestJava.dependsOn 'processTestMessages'

javadoc {
include "**/org/apache/kafka/clients/admin/*"
include "**/org/apache/kafka/clients/consumer/*"
Expand Down Expand Up @@ -1135,6 +1149,7 @@ project(':tools') {

project(':streams') {
archivesBaseName = "kafka-streams"
ext.buildStreamsVersionFileName = "kafka-streams-version.properties"

dependencies {
compile project(':clients')
Expand Down Expand Up @@ -1182,7 +1197,46 @@ project(':streams') {
duplicatesStrategy 'exclude'
}

task determineCommitId {
def takeFromHash = 16
if (commitId) {
commitId = commitId.take(takeFromHash)
} else if (file("$rootDir/.git/HEAD").exists()) {
def headRef = file("$rootDir/.git/HEAD").text
if (headRef.contains('ref: ')) {
headRef = headRef.replaceAll('ref: ', '').trim()
if (file("$rootDir/.git/$headRef").exists()) {
commitId = file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
}
} else {
commitId = headRef.trim().take(takeFromHash)
}
} else {
commitId = "unknown"
}
}

task createStreamsVersionFile(dependsOn: determineCommitId) {
ext.receiptFile = file("$buildDir/kafka/$buildStreamsVersionFileName")
outputs.file receiptFile
outputs.upToDateWhen { false }
doLast {
def data = [
commitId: commitId,
version: version,
]

receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}

jar {
dependsOn 'createStreamsVersionFile'
from("$buildDir") {
include "kafka/$buildStreamsVersionFileName"
}
dependsOn 'copyDependantLibs'
}

Expand Down Expand Up @@ -1596,9 +1650,11 @@ project(':jmh-benchmarks') {
}

dependencies {
compile project(':core')
compile project(':clients')
compile project(':streams')
compile libs.jmhCore
compile libs.mockitoCore
annotationProcessor libs.jmhGeneratorAnnProcess
compile libs.jmhCoreBenchmarks
}
Expand All @@ -1609,6 +1665,9 @@ project(':jmh-benchmarks') {
}
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-jmh-benchmarks.xml")
}

task jmh(type: JavaExec, dependsOn: [':jmh-benchmarks:clean', ':jmh-benchmarks:shadowJar']) {

Expand Down
45 changes: 45 additions & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
@@ -0,0 +1,45 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->

<import-control pkg="org.apache.kafka.jmh">

<allow pkg="java"/>
<allow pkg="scala"/>
<allow pkg="javax.management"/>
<allow pkg="org.slf4j"/>
<allow pkg="org.openjdk.jmh.annotations"/>
<allow pkg="org.openjdk.jmh.runner"/>
<allow pkg="org.openjdk.jmh.infra"/>
<allow pkg="java.security"/>
<allow pkg="javax.net.ssl"/>
<allow pkg="javax.security"/>
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.producer"/>
<allow pkg="kafka.cluster"/>
<allow pkg="kafka.log"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.api"/>
<allow class="kafka.utils.KafkaScheduler"/>
<allow pkg="org.mockito"/>


<subpackage name="cache">
</subpackage>
</import-control>
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Expand Up @@ -118,6 +118,7 @@
<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
33 changes: 25 additions & 8 deletions checkstyle/suppressions.xml
Expand Up @@ -8,6 +8,14 @@

<!-- Note that [/\\] must be used as the path separator for cross-platform support -->

<!-- Generator -->
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator).java"/>
<suppress checks="NPathComplexity"
files="(FieldSpec).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType).java"/>

<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin).java"/>
Expand Down Expand Up @@ -49,7 +57,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator).java"/>

<suppress checks="JavaNCSS"
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java|KafkaAdminClient.java"/>
Expand All @@ -60,6 +68,12 @@
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>

<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|MethodLength|JavaNCSS)"
files="clients[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>

<suppress checks="NPathComplexity"
files="MessageTest.java"/>

<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message)Test.java"/>
Expand Down Expand Up @@ -127,6 +141,9 @@
<suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog)Test.java"/>

<suppress checks="ClassFanOutComplexity"
files="(WorkerSinkTask|WorkerSourceTask)Test.java"/>

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
Expand Down Expand Up @@ -155,6 +172,9 @@
<suppress checks="NPathComplexity"
files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>

<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>

<!-- suppress FinalLocalVariable outside of the streams package. -->
<suppress checks="FinalLocalVariable"
files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/>
Expand Down Expand Up @@ -193,6 +213,10 @@
<suppress checks="NPathComplexity"
files="KStreamKStreamLeftJoinTest.java"/>

<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>


<!-- Streams Test-Utils -->
<suppress checks="ClassFanOutComplexity"
files="TopologyTestDriver.java"/>
Expand Down Expand Up @@ -226,13 +250,6 @@
<suppress checks="JavaNCSS"
files="RequestResponseTest.java"/>

<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling)"
files="clients[\\/]src[\\/]generated[\\/].+.java$"/>
<suppress checks="NPathComplexity"
files="MessageTest.java"/>

<suppress checks="CyclomaticComplexity" files="MessageDataGenerator.java"/>

<!-- proactive support -->
<suppress checks=".+"
files="support-metrics-client/src/main/generated/.*"/>
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
Expand Down Expand Up @@ -82,7 +83,14 @@ public ApiKeys apiKey() {
}

public RequestHeader makeHeader(short version) {
return new RequestHeader(apiKey(), version, clientId, correlationId);
short requestApiKey = requestBuilder.apiKey().id;
return new RequestHeader(
new RequestHeaderData().
setRequestApiKey(requestApiKey).
setRequestApiVersion(version).
setClientId(clientId).
setCorrelationId(correlationId),
ApiKeys.forId(requestApiKey).headerVersion(version));
}

public AbstractRequest.Builder<?> requestBuilder() {
Expand Down
Expand Up @@ -42,9 +42,9 @@ public class CommonClientConfigs {
+ "servers (you may want more than one, though, in case a server is down).";

public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+ " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>"
+ "<p>If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.</p>";
public static final String CLIENT_DNS_LOOKUP_DOC = "Controls how the client uses DNS lookups. If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+ " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers."
+ " If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.";

public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
Expand Down
Expand Up @@ -397,14 +397,15 @@ public boolean handleResponse(FetchResponse<?> response) {
nextMetadata = FetchMetadata.INITIAL;
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
log.debug("Node {} sent a full fetch response{}",
node, responseDataToLogString(response));
if (log.isDebugEnabled())
log.debug("Node {} sent a full fetch response{}", node, responseDataToLogString(response));
nextMetadata = FetchMetadata.INITIAL;
return true;
} else {
// The server created a new incremental fetch session.
log.debug("Node {} sent a full fetch response that created a new incremental " +
"fetch session {}{}", node, response.sessionId(), responseDataToLogString(response));
if (log.isDebugEnabled())
log.debug("Node {} sent a full fetch response that created a new incremental " +
"fetch session {}{}", node, response.sessionId(), responseDataToLogString(response));
nextMetadata = FetchMetadata.newIncremental(response.sessionId());
return true;
}
Expand All @@ -416,14 +417,16 @@ public boolean handleResponse(FetchResponse<?> response) {
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
// The incremental fetch session was closed by the server.
log.debug("Node {} sent an incremental fetch response closing session {}{}",
node, nextMetadata.sessionId(), responseDataToLogString(response));
if (log.isDebugEnabled())
log.debug("Node {} sent an incremental fetch response closing session {}{}",
node, nextMetadata.sessionId(), responseDataToLogString(response));
nextMetadata = FetchMetadata.INITIAL;
return true;
} else {
// The incremental fetch session was continued by the server.
log.debug("Node {} sent an incremental fetch response for session {}{}",
node, response.sessionId(), responseDataToLogString(response));
if (log.isDebugEnabled())
log.debug("Node {} sent an incremental fetch response for session {}{}",
node, response.sessionId(), responseDataToLogString(response));
nextMetadata = nextMetadata.nextIncremental();
return true;
}
Expand Down
Expand Up @@ -705,7 +705,7 @@ public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestH

private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
Sensor throttleTimeSensor, long now) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer, requestHeader.headerVersion());
// Always expect the response version id to be the same as the request version id
Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
correlate(requestHeader, responseHeader);
Expand Down
23 changes: 23 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Expand Up @@ -837,6 +837,29 @@ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> group
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}

/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*
* @param options The options to use when deleting offsets in a consumer group.
* @return The DeleteConsumerGroupOffsetsResult.
*/
DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteConsumerGroupOffsetsOptions options);

/**
* Delete committed offsets for a set of partitions in a consumer group with the default
* options. This will succeed at the partition level only if the group is not actively
* subscribed to the corresponding topic.
*
* @return The DeleteConsumerGroupOffsetsResult.
*/
default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
}

/**
* Elect the preferred replica as leader for topic partitions.
* <p>
Expand Down

0 comments on commit 2715861

Please sign in to comment.