Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/trunk' into master
Browse files Browse the repository at this point in the history
* apache-github/trunk:
  MINOR: Fix grammar in error message for InvalidRecordException (apache#8465)
  KAFKA-9868: Reduce number of transaction log partitions for embed broker (apache#8522)
  MINOR: Adding github whitelist (apache#8523)
  MINOR: Upgrade gradle plugins and test libraries for Java 14 support (apache#8519)
  MINOR: Further reduce runtime for metrics integration tests (apache#8514)
  MINOR: Use .asf.yaml to direct github notifications to JIRA and mailing lists (apache#8521)
  MINOR: Update to Gradle 6.3 (apache#7677)
  HOTFIX: fix checkstyle error of RocksDBStoreTest and flaky RocksDBTimestampedStoreTest.shouldOpenExistingStoreInRegularMode (apache#8515)
  MINOR: cleanup RocksDBStore tests  (apache#8510)
  KAFKA-9818: Fix flaky test in RecordCollectorTest (apache#8507)
  MINOR: reduce impact of trace logging in replica hot path (apache#8468)
  KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence (apache#8475)
  KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test (apache#8501)
  MINOR: improve test coverage for dynamic LogConfig(s) (apache#7616)
  MINOR: Switch order of sections on tumbling and hopping windows in streams doc. Tumbling windows are defined as "special case of hopping time windows" - but hopping windows currently only explained later in the docs. (apache#8505)
  KAFKA-9819: Fix flaky test in StoreChangelogReaderTest (apache#8488)
  • Loading branch information
ijuma committed Apr 21, 2020
2 parents 02defca + e406d9d commit 48158e5
Show file tree
Hide file tree
Showing 28 changed files with 892 additions and 486 deletions.
26 changes: 26 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

notifications:
commits: commits@kafka.apache.org
issues: jira@kafka.apache.org
pullrequests: jira@kafka.apache.org
jira_options: link label

jenkins:
github_whitelist:
- ConcurrencyPractitioner
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):

### Running a task with all the scala versions enabled by default ###

Append `All` to the task name:
Invoke the `gradlewAll` script followed by the task(s):

./gradlew testAll
./gradlew jarAll
./gradlew releaseTarGzAll
./gradlewAll test
./gradlewAll jar
./gradlewAll releaseTarGz

### Running a task for a specific project ###
This is for `core`, `examples` and `clients`
Expand All @@ -119,7 +119,7 @@ build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory an
to avoid known issues with this configuration.

### Publishing the jar for all version of Scala and for all projects to maven ###
./gradlew uploadArchivesAll
./gradlewAll uploadArchives

Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradle.properties` (typically, `~/.gradle/gradle.properties`) and assign the following variables

Expand Down Expand Up @@ -161,7 +161,7 @@ Please note for this to work you should create/update user maven settings (typic


### Installing the jars to the local Maven repository ###
./gradlew installAll
./gradlewAll install

### Building the test jar ###
./gradlew testJar
Expand Down
97 changes: 14 additions & 83 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ buildscript {
classpath "com.github.jengelman.gradle.plugins:shadow:$versions.shadowPlugin"
classpath "org.owasp:dependency-check-gradle:$versions.owaspDepCheckPlugin"
classpath "com.diffplug.spotless:spotless-plugin-gradle:$versions.spotlessPlugin"
classpath "com.github.spotbugs:spotbugs-gradle-plugin:$versions.spotbugsPlugin"
classpath "gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:$versions.spotbugsPlugin"
classpath "org.gradle:test-retry-gradle-plugin:$versions.testRetryPlugin"
}
}
Expand Down Expand Up @@ -82,7 +82,7 @@ allprojects {

dependencyUpdates {
revision="release"
resolutionStrategy = {
resolutionStrategy {
componentSelection { rules ->
rules.all { ComponentSelection selection ->
boolean rejected = ['snap', 'alpha', 'beta', 'rc', 'cr', 'm'].any { qualifier ->
Expand Down Expand Up @@ -112,7 +112,7 @@ allprojects {
}

ext {
gradleVersion = "$versions.gradle"
gradleVersion = versions.gradle
minJavaVersion = "8"
buildVersionFileName = "kafka-version.properties"

Expand Down Expand Up @@ -447,6 +447,11 @@ subprojects {
}

plugins.withType(ScalaPlugin) {

scala {
zincVersion = versions.zinc
}

task scaladocJar(type:Jar) {
classifier = 'scaladoc'
from "$rootDir/LICENSE"
Expand Down Expand Up @@ -521,7 +526,7 @@ subprojects {
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
configProperties = checkstyleConfigProperties("import-control.xml")
toolVersion = "$versions.checkstyle"
toolVersion = versions.checkstyle
}

configure(checkstyleMain) {
Expand All @@ -537,13 +542,13 @@ subprojects {
test.dependsOn('checkstyleMain', 'checkstyleTest')

spotbugs {
toolVersion = "$versions.spotbugs"
toolVersion = versions.spotbugs
excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
ignoreFailures = false
}
test.dependsOn('spotbugsMain')

tasks.withType(com.github.spotbugs.SpotBugsTask) {
tasks.withType(com.github.spotbugs.snom.SpotBugsTask) {
reports {
// Continue supporting `xmlFindBugsReport` for compatibility
xml.enabled(project.hasProperty('xmlSpotBugsReport') || project.hasProperty('xmlFindBugsReport'))
Expand All @@ -557,7 +562,7 @@ subprojects {
apply plugin: "jacoco"

jacoco {
toolVersion = "$versions.jacoco"
toolVersion = versions.jacoco
}

// NOTE: Jacoco Gradle plugin does not support "offline instrumentation" this means that classes mocked by PowerMock
Expand Down Expand Up @@ -667,52 +672,6 @@ task jacocoRootReport(type: org.gradle.testing.jacoco.tasks.JacocoReport) {

task reportCoverage(dependsOn: ['jacocoRootReport', 'core:reportCoverage'])

for ( sv in availableScalaVersions ) {
String taskSuffix = sv.replaceAll("\\.", "_")

tasks.create(name: "jarScala_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['core:jar', 'streams:streams-scala:jar']
}

tasks.create(name: "testScala_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['core:test', 'streams:streams-scala:test']
}

tasks.create(name: "srcJar_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['core:srcJar', 'streams:streams-scala:srcJar']
}

tasks.create(name: "docsJar_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['core:docsJar', 'streams:streams-scala:docsJar']
}

tasks.create(name: "install_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['install']
}

tasks.create(name: "releaseTarGz_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['releaseTarGz']
}

tasks.create(name: "uploadScalaArchives_${taskSuffix}", type: GradleBuild) {
startParameter = project.getGradle().getStartParameter().newInstance()
startParameter.projectProperties += [scalaVersion: "${sv}"]
tasks = ['core:uploadArchives', 'streams:streams-scala:uploadArchives']
}
}

def connectPkgs = [
'connect:api',
'connect:basic-auth-extension',
Expand All @@ -724,37 +683,9 @@ def connectPkgs = [
'connect:mirror-client'
]

def pkgs = [
'clients',
'examples',
'log4j-appender',
'streams',
'streams:examples',
'streams:streams-scala',
'streams:test-utils',
'tools'
] + connectPkgs

/** Create one task per default Scala version */
def withDefScalaVersions(taskName) {
defaultScalaVersions.collect { taskName + '_' + it.replaceAll('\\.', '_') }
}

tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
tasks.create(name: "jarAll", dependsOn: withDefScalaVersions('jarScala') + pkgs.collect { it + ":jar" }) { }

tasks.create(name: "srcJarAll", dependsOn: withDefScalaVersions('srcJar') + pkgs.collect { it + ":srcJar" }) { }

tasks.create(name: "docsJarAll", dependsOn: withDefScalaVersions('docsJar') + pkgs.collect { it + ":docsJar" }) { }

tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + ":test" }) {}
tasks.create(name: "testAll", dependsOn: withDefScalaVersions('testScala') + pkgs.collect { it + ":test" }) { }

tasks.create(name: "installAll", dependsOn: withDefScalaVersions('install') + pkgs.collect { it + ":install" }) { }

tasks.create(name: "releaseTarGzAll", dependsOn: withDefScalaVersions('releaseTarGz')) { }

tasks.create(name: "uploadArchivesAll", dependsOn: withDefScalaVersions('uploadScalaArchives') + pkgs.collect { it + ":uploadArchives" }) { }

project(':core') {
println "Building project 'core' with Scala version ${versions.scala}"
Expand Down Expand Up @@ -813,7 +744,7 @@ project(':core') {
}

scoverage {
scoverageVersion = "$versions.scoverage"
scoverageVersion = versions.scoverage
reportDir = file("${rootProject.buildDir}/scoverage")
highlighting = false
minimumRate = 0.0
Expand Down Expand Up @@ -1653,7 +1584,7 @@ project(':connect:api') {
}

javadoc {
include "**/org/apache/kafka/connect/**" // needed for the `javadocAll` task
include "**/org/apache/kafka/connect/**" // needed for the `aggregatedJavadoc` task
// The URL structure was changed to include the locale after Java 8
if (JavaVersion.current().isJava11Compatible())
options.links "https://docs.oracle.com/en/java/javase/${JavaVersion.current().majorVersion}/docs/api/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public enum Errors {
NoReassignmentInProgressException::new),
GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
GroupSubscribedToTopicException::new),
INVALID_RECORD(87, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new),
INVALID_RECORD(87, "This record has failed the validation on broker and hence will be rejected.", InvalidRecordException::new),
UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);
Expand Down
53 changes: 53 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;

import java.util.EnumSet;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -60,9 +61,13 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1093,4 +1098,52 @@ public static <K1, V1, K2, V2> Map<K2, V2> transformMap(
)
);
}

/**
* A Collector that offers two kinds of convenience:
* 1. You can specify the concrete type of the returned Map
* 2. You can turn a stream of Entries directly into a Map without having to mess with a key function
* and a value function. In particular, this is handy if all you need to do is apply a filter to a Map's entries.
*
*
* One thing to be wary of: These types are too "distant" for IDE type checkers to warn you if you
* try to do something like build a TreeMap of non-Comparable elements. You'd get a runtime exception for that.
*
* @param mapSupplier The constructor for your concrete map type.
* @param <K> The Map key type
* @param <V> The Map value type
* @param <M> The type of the Map itself.
* @return new Collector<Map.Entry<K, V>, M, M>
*/
public static <K, V, M extends Map<K, V>> Collector<Map.Entry<K, V>, M, M> entriesToMap(final Supplier<M> mapSupplier) {
return new Collector<Map.Entry<K, V>, M, M>() {
@Override
public Supplier<M> supplier() {
return mapSupplier;
}

@Override
public BiConsumer<M, Map.Entry<K, V>> accumulator() {
return (map, entry) -> map.put(entry.getKey(), entry.getValue());
}

@Override
public BinaryOperator<M> combiner() {
return (map, map2) -> {
map.putAll(map2);
return map;
};
}

@Override
public Function<M, M> finisher() {
return map -> map;
}

@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH);
}
};
}
}
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
lastFetchLeaderLogEndOffset = leaderEndOffset
lastFetchTimeMs = followerFetchTimeMs
updateLastSentHighWatermark(lastSentHighwatermark)
trace(s"Updated state of replica to $this")
}

/**
Expand All @@ -96,7 +95,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
*/
private def updateLastSentHighWatermark(highWatermark: Long): Unit = {
_lastSentHighWatermark = highWatermark
trace(s"Updated HW of replica to $highWatermark")
}

def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class ReplicaFetcherThread(name: String,
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
val logTrace = isTraceEnabled
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
val log = partition.localLogOrException
val records = toMemoryRecords(partitionData.records)
Expand All @@ -159,14 +160,14 @@ class ReplicaFetcherThread(name: String,
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, log.logEndOffset))

if (isTraceEnabled)
if (logTrace)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))

// Append the leader's messages to the log
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)

if (isTraceEnabled)
if (logTrace)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(log.logEndOffset, records.sizeInBytes, topicPartition))
val leaderLogStartOffset = partitionData.logStartOffset
Expand All @@ -175,7 +176,7 @@ class ReplicaFetcherThread(name: String,
// These values will be computed upon becoming leader or handling a preferred read replica fetch.
val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
log.maybeIncrementLogStartOffset(leaderLogStartOffset)
if (isTraceEnabled)
if (logTrace)
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")

// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
Expand Down

0 comments on commit 48158e5

Please sign in to comment.