From 8e161580b859b2fcd54c59625e232b99f3bb48d0 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 5 Jun 2019 14:10:00 -0700 Subject: [PATCH] KAFKA-8305; Support default partitions & replication factor in AdminClient#createTopic (KIP-464) (#6728) This commit makes three changes: - Adds a constructor for NewTopic(String, Optional, Optional) which allows users to specify Optional.empty() for numPartitions or replicationFactor in order to use the broker default. - Changes AdminManager to accept -1 as valid options for replication factor and numPartitions (resolving to broker defaults). - Makes --partitions and --replication-factor optional arguments when creating topics using kafka-topics.sh. - Adds a dependency on scalaJava8Compat library to make it simpler to convert Scala Option to Java Optional Reviewers: Ismael Juma , Ryanne Dolan , Jason Gustafson --- build.gradle | 2 + .../apache/kafka/clients/admin/NewTopic.java | 34 ++++-- .../common/requests/CreateTopicsRequest.java | 22 +++- .../common/message/CreateTopicsRequest.json | 7 +- .../common/message/CreateTopicsResponse.json | 3 +- .../common/requests/RequestResponseTest.java | 24 +++++ .../main/scala/kafka/admin/TopicCommand.scala | 28 +++-- .../scala/kafka/server/AdminManager.scala | 16 ++- .../api/AdminClientIntegrationTest.scala | 36 ++++--- .../kafka/api/ConsumerTopicCreationTest.scala | 2 +- .../kafka/tools/LogCompactionTester.scala | 2 +- .../TopicCommandWithAdminClientTest.scala | 101 +++++++++++++----- .../AbstractCreateTopicsRequestTest.scala | 14 ++- .../server/CreateTopicsRequestTest.scala | 11 +- .../CreateTopicsRequestWithPolicyTest.scala | 7 +- .../scala/unit/kafka/utils/TestUtils.scala | 14 ++- docs/upgrade.html | 1 + gradle/dependencies.gradle | 2 + 18 files changed, 248 insertions(+), 78 deletions(-) diff --git a/build.gradle b/build.gradle index 42cacb0be02c..d1a2a9da52ba 100644 --- a/build.gradle +++ b/build.gradle @@ -939,6 +939,8 @@ project(':clients') { compile libs.lz4 compile libs.snappy compile libs.slf4jApi + compile libs.scalaJava8Compat + compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing compileOnly libs.jacksonJDK8Datatypes diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java index 5b1bd32f89ac..66585eac3268 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; +import java.util.Optional; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig; @@ -31,9 +32,13 @@ * A new topic to be created via {@link AdminClient#createTopics(Collection)}. */ public class NewTopic { + + private static final int NO_PARTITIONS = -1; + private static final short NO_REPLICATION_FACTOR = -1; + private final String name; - private final int numPartitions; - private final short replicationFactor; + private final Optional numPartitions; + private final Optional replicationFactor; private final Map> replicasAssignments; private Map configs = null; @@ -41,6 +46,15 @@ public class NewTopic { * A new topic with the specified replication factor and number of partitions. */ public NewTopic(String name, int numPartitions, short replicationFactor) { + this(name, Optional.of(numPartitions), Optional.of(replicationFactor)); + } + + /** + * A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to + * the broker configurations for {@code num.partitions} and {@code default.replication.factor} + * respectively. + */ + public NewTopic(String name, Optional numPartitions, Optional replicationFactor) { this.name = name; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; @@ -56,8 +70,8 @@ public NewTopic(String name, int numPartitions, short replicationFactor) { */ public NewTopic(String name, Map> replicasAssignments) { this.name = name; - this.numPartitions = -1; - this.replicationFactor = -1; + this.numPartitions = Optional.empty(); + this.replicationFactor = Optional.empty(); this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments); } @@ -72,14 +86,14 @@ public String name() { * The number of partitions for the new topic or -1 if a replica assignment has been specified. */ public int numPartitions() { - return numPartitions; + return numPartitions.orElse(NO_PARTITIONS); } /** * The replication factor for the new topic or -1 if a replica assignment has been specified. */ public short replicationFactor() { - return replicationFactor; + return replicationFactor.orElse(NO_REPLICATION_FACTOR); } /** @@ -111,8 +125,8 @@ public Map configs() { CreatableTopic convertToCreatableTopic() { CreatableTopic creatableTopic = new CreatableTopic(). setName(name). - setNumPartitions(numPartitions). - setReplicationFactor(replicationFactor); + setNumPartitions(numPartitions.orElse(NO_PARTITIONS)). + setReplicationFactor(replicationFactor.orElse(NO_REPLICATION_FACTOR)); if (replicasAssignments != null) { for (Entry> entry : replicasAssignments.entrySet()) { creatableTopic.assignments().add( @@ -136,8 +150,8 @@ CreatableTopic convertToCreatableTopic() { public String toString() { StringBuilder bld = new StringBuilder(); bld.append("(name=").append(name). - append(", numPartitions=").append(numPartitions). - append(", replicationFactor=").append(replicationFactor). + append(", numPartitions=").append(numPartitions.map(String::valueOf).orElse("default")). + append(", replicationFactor=").append(replicationFactor.map(String::valueOf).orElse("default")). append(", replicasAssignments=").append(replicasAssignments). append(", configs=").append(configs). append(")"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index a2cd17d9c7ad..dd26e5642632 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.requests; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; @@ -24,8 +27,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; -import java.nio.ByteBuffer; - public class CreateTopicsRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { private final CreateTopicsRequestData data; @@ -40,6 +41,23 @@ public CreateTopicsRequest build(short version) { if (data.validateOnly() && version == 0) throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " + "CreateTopicsRequest"); + + final List topicsWithDefaults = data.topics() + .stream() + .filter(topic -> topic.assignments().isEmpty()) + .filter(topic -> + topic.numPartitions() == CreateTopicsRequest.NO_NUM_PARTITIONS + || topic.replicationFactor() == CreateTopicsRequest.NO_REPLICATION_FACTOR) + .map(CreatableTopic::name) + .collect(Collectors.toList()); + + if (!topicsWithDefaults.isEmpty() && version < 4) { + throw new UnsupportedVersionException("Creating topics with default " + + "partitions/replication factor are only supported in CreateTopicRequest " + + "version 4+. The following topics need values for partitions and replicas: " + + topicsWithDefaults); + } + return new CreateTopicsRequest(data, version); } diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json index 842fb204ceff..d2285668f4cb 100644 --- a/clients/src/main/resources/common/message/CreateTopicsRequest.json +++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json @@ -18,16 +18,17 @@ "type": "request", "name": "CreateTopicsRequest", // Version 1 adds validateOnly. - "validVersions": "0-3", + // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464) + "validVersions": "0-4", "fields": [ { "name": "Topics", "type": "[]CreatableTopic", "versions": "0+", "about": "The topics to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, { "name": "NumPartitions", "type": "int32", "versions": "0+", - "about": "The number of partitions to create in the topic, or -1 if we are specifying a manual partition assignment." }, + "about": "The number of partitions to create in the topic, or -1 if we are either specifying a manual partition assignment or using the default partitions." }, { "name": "ReplicationFactor", "type": "int16", "versions": "0+", - "about": "The number of replicas to create for each partition in the topic, or -1 if we are specifying a manual partition assignment." }, + "about": "The number of replicas to create for each partition in the topic, or -1 if we are either specifying a manual partition assignment or using the default replication factor." }, { "name": "Assignments", "type": "[]CreatableReplicaAssignment", "versions": "0+", "about": "The manual partition assignment, or the empty array if we are using automatic assignment.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, diff --git a/clients/src/main/resources/common/message/CreateTopicsResponse.json b/clients/src/main/resources/common/message/CreateTopicsResponse.json index 864e5fa36510..d56e642061e4 100644 --- a/clients/src/main/resources/common/message/CreateTopicsResponse.json +++ b/clients/src/main/resources/common/message/CreateTopicsResponse.json @@ -20,7 +20,8 @@ // Version 1 adds a per-topic error message string. // Version 2 adds the throttle time. // Starting in version 3, on quota violation, brokers send out responses before throttling. - "validVersions": "0-3", + // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464) + "validVersions": "0-4", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e06202bb2f68..11d85d6d7cf8 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -82,6 +82,7 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; +import org.apache.kafka.common.requests.CreateTopicsRequest.Builder; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; @@ -120,6 +121,7 @@ import static org.apache.kafka.test.TestUtils.toBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -644,6 +646,28 @@ public void testCreateTopicRequestV0FailsIfValidateOnly() { createCreateTopicRequest(0, true); } + @Test + public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() { + final UnsupportedVersionException exception = assertThrows( + UnsupportedVersionException.class, () -> { + CreateTopicsRequestData data = new CreateTopicsRequestData() + .setTimeoutMs(123) + .setValidateOnly(false); + data.topics().add(new CreatableTopic(). + setName("foo"). + setNumPartitions(CreateTopicsRequest.NO_NUM_PARTITIONS). + setReplicationFactor((short) 1)); + data.topics().add(new CreatableTopic(). + setName("bar"). + setNumPartitions(1). + setReplicationFactor(CreateTopicsRequest.NO_REPLICATION_FACTOR)); + + new Builder(data).build((short) 3); + }); + assertTrue(exception.getMessage().contains("supported in CreateTopicRequest version 4+")); + assertTrue(exception.getMessage().contains("[foo, bar]")); + } + @Test public void testFetchRequestMaxBytesOldVersions() throws Exception { final short version = 1; diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 996f73bbfb05..5042fa27fca2 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -30,8 +30,8 @@ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{ListTopicsOptions, NewPartitions, NewTopic, AdminClient => JAdminClient} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.config.ConfigResource.Type +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.security.JaasUtils @@ -40,6 +40,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException import scala.collection.JavaConverters._ import scala.collection._ +import scala.compat.java8.OptionConverters._ import scala.io.StdIn object TopicCommand extends Logging { @@ -82,7 +83,7 @@ object TopicCommand extends Logging { class CommandTopicPartition(opts: TopicCommandOptions) { val name: String = opts.topic.get val partitions: Option[Integer] = opts.partitions - val replicationFactor: Integer = opts.replicationFactor.getOrElse(-1) + val replicationFactor: Option[Integer] = opts.replicationFactor val replicaAssignment: Option[Map[Int, List[Int]]] = opts.replicaAssignment val configsToAdd: Properties = parseTopicConfigsToBeAdded(opts) val configsToDelete: Seq[String] = parseTopicConfigsToBeDeleted(opts) @@ -172,14 +173,21 @@ object TopicCommand extends Logging { case class AdminClientTopicService private (adminClient: JAdminClient) extends TopicService { override def createTopic(topic: CommandTopicPartition): Unit = { - if (topic.replicationFactor > Short.MaxValue) - throw new IllegalArgumentException(s"The replication factor's maximum value must be smaller or equal to ${Short.MaxValue}") + if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1)) + throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive") + if (topic.partitions.exists(partitions => partitions < 1)) + throw new IllegalArgumentException(s"The partitions must be greater than 0") if (!adminClient.listTopics().names().get().contains(topic.name)) { val newTopic = if (topic.hasReplicaAssignment) new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get)) - else - new NewTopic(topic.name, topic.partitions.get, topic.replicationFactor.shortValue()) + else { + new NewTopic( + topic.name, + topic.partitions.asJava, + topic.replicationFactor.map(_.toShort).map(Short.box).asJava) + } + val configsMap = topic.configsToAdd.stringPropertyNames() .asScala .map(name => name -> topic.configsToAdd.getProperty(name)) @@ -289,7 +297,7 @@ object TopicCommand extends Logging { if (topic.hasReplicaAssignment) adminZkClient.createTopicWithAssignment(topic.name, topic.configsToAdd, topic.replicaAssignment.get) else - adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor, topic.configsToAdd, topic.rackAwareMode) + adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor.get, topic.configsToAdd, topic.rackAwareMode) println(s"Created topic ${topic.name}.") } catch { case e: TopicExistsException => if (!topic.ifTopicDoesntExist()) throw e @@ -538,11 +546,11 @@ object TopicCommand extends Logging { .describedAs("name") .ofType(classOf[String]) private val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " + - "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected") + "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.") .withRequiredArg .describedAs("# of partitions") .ofType(classOf[java.lang.Integer]) - private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.") + private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.") .withRequiredArg .describedAs("replication factor") .ofType(classOf[java.lang.Integer]) @@ -633,7 +641,7 @@ object TopicCommand extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) if (!has(listOpt) && !has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) - if (has(createOpt) && !has(replicaAssignmentOpt)) + if (has(createOpt) && !has(replicaAssignmentOpt) && has(zkConnectOpt)) CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt, replicationFactorOpt) if (has(bootstrapServerOpt) && has(alterOpt)) { CommandLineUtils.checkInvalidArgsSet(parser, options, Set(bootstrapServerOpt, configOpt), Set(alterOpt)) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 85d272c4e593..cfa599da3454 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -62,6 +62,9 @@ class AdminManager(val config: KafkaConfig, def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0 + private val defaultNumPartitions = config.numPartitions.intValue() + private val defaultReplicationFactor = config.defaultReplicationFactor.shortValue() + /** * Try to complete delayed topic operations with the request key */ @@ -95,8 +98,15 @@ class AdminManager(val config: KafkaConfig, throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " + "Both cannot be used at the same time.") } + + val resolvedNumPartitions = if (topic.numPartitions == NO_NUM_PARTITIONS) + defaultNumPartitions else topic.numPartitions + val resolvedReplicationFactor = if (topic.replicationFactor == NO_REPLICATION_FACTOR) + defaultReplicationFactor else topic.replicationFactor + val assignments = if (topic.assignments().isEmpty) { - AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor) + AdminUtils.assignReplicasToBrokers( + brokers, resolvedNumPartitions, resolvedReplicationFactor) } else { val assignments = new mutable.HashMap[Int, Seq[Int]] // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case, @@ -115,9 +125,9 @@ class AdminManager(val config: KafkaConfig, // Use `null` for unset fields in the public API val numPartitions: java.lang.Integer = - if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions + if (topic.assignments().isEmpty) resolvedNumPartitions else null val replicationFactor: java.lang.Short = - if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor + if (topic.assignments().isEmpty) resolvedReplicationFactor else null val javaAssignments = if (topic.assignments().isEmpty) { null } else { diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 88f10ff51720..d622a7963de1 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -50,10 +50,10 @@ import org.junit.rules.Timeout import org.junit.{After, Before, Rule, Test} import org.scalatest.Assertions.intercept import scala.collection.JavaConverters._ +import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.Random - /** * An integration test of the KafkaAdminClient. * @@ -150,10 +150,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testCreateDeleteTopics(): Unit = { client = AdminClient.create(createConfig()) - val topics = Seq("mytopic", "mytopic2") + val topics = Seq("mytopic", "mytopic2", "mytopic3") val newTopics = Seq( new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), - new NewTopic("mytopic2", 3, 3) + new NewTopic("mytopic2", 3, 3.toShort), + new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) ) client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() waitForTopics(client, List(), topics) @@ -166,6 +167,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException]) assertTrue(results.containsKey("mytopic2")) assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException]) + assertTrue(results.containsKey("mytopic3")) + assertFutureExceptionTypeEquals(results.get("mytopic3"), classOf[TopicExistsException]) val topicToDescription = client.describeTopics(topics.asJava).all.get() assertEquals(topics.toSet, topicToDescription.keySet.asScala) @@ -204,6 +207,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertTrue(partition.replicas.contains(partition.leader)) } + val topic3 = topicToDescription.get("mytopic3") + assertEquals("mytopic3", topic3.name) + assertEquals(configs.head.numPartitions, topic3.partitions.size) + assertEquals(configs.head.defaultReplicationFactor, topic3.partitions.get(0).replicas().size()) + client.deleteTopics(topics.asJava).all.get() waitForTopics(client, List(), topics) } @@ -212,7 +220,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { def testMetadataRefresh(): Unit = { client = AdminClient.create(createConfig()) val topics = Seq("mytopic") - val newTopics = Seq(new NewTopic("mytopic", 3, 3)) + val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort)) client.createTopics(newTopics.asJava).all.get() waitForTopics(client, expectedPresent = topics, expectedMissing = List()) @@ -237,7 +245,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(expectedOperations, result.authorizedOperations().get()) val topic = "mytopic" - val newTopics = Seq(new NewTopic(topic, 3, 3)) + val newTopics = Seq(new NewTopic(topic, 3, 3.toShort)) client.createTopics(newTopics.asJava).all.get() waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = List()) @@ -265,7 +273,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { client = AdminClient.create(createConfig()) val existingTopic = "existing-topic" - client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get() + client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get() waitForTopics(client, Seq(existingTopic), List()) val nonExistingTopic = "non-existing" @@ -1063,7 +1071,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { def testDelayedClose(): Unit = { client = AdminClient.create(createConfig()) val topics = Seq("mytopic", "mytopic2") - val newTopics = topics.map(new NewTopic(_, 1, 1)) + val newTopics = topics.map(new NewTopic(_, 1, 1.toShort)) val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() client.close(time.Duration.ofHours(2)) val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() @@ -1083,7 +1091,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { client = AdminClient.create(config) // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be // cancelled by the close operation. - val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, new CreateTopicsOptions().timeoutMs(900000)).all() client.close(time.Duration.ZERO) assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) @@ -1100,7 +1108,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") client = AdminClient.create(config) val startTimeMs = Time.SYSTEM.milliseconds() - val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, new CreateTopicsOptions().timeoutMs(2)).all() assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) val endTimeMs = Time.SYSTEM.milliseconds() @@ -1116,10 +1124,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000") val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) - val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, new CreateTopicsOptions().validateOnly(true)).all() assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) - val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1)).asJava, + val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1.toShort)).asJava, new CreateTopicsOptions().validateOnly(true)).all() future2.get assertEquals(1, factory.failuresInjected) @@ -1141,7 +1149,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val testTopicName = "test_topic" val testNumPartitions = 2 client.createTopics(Collections.singleton( - new NewTopic(testTopicName, testNumPartitions, 1))).all().get() + new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get() waitForTopics(client, List(testTopicName), List()) val producer = createProducer() @@ -1799,8 +1807,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val client = AdminClient.create(createConfig) val longTopicName = String.join("", Collections.nCopies(249, "x")); val invalidTopicName = String.join("", Collections.nCopies(250, "x")); - val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3), - new NewTopic(longTopicName, 3, 3)) + val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3.toShort), + new NewTopic(longTopicName, 3, 3.toShort)) val results = client.createTopics(newTopics2.asJava).values() assertTrue(results.containsKey(longTopicName)) results.get(longTopicName).get() diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala index c145b24416fd..f447a56f50db 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala @@ -67,7 +67,7 @@ class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consume val record = new ProducerRecord(topic_1, 0, "key".getBytes, "value".getBytes) // create `topic_1` and produce a record to it - adminClient.createTopics(Collections.singleton(new NewTopic(topic_1, 1, 1))).all.get + adminClient.createTopics(Collections.singleton(new NewTopic(topic_1, 1, 1.toShort))).all.get producer.send(record).get consumer.subscribe(util.Arrays.asList(topic_1, topic_2)) diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala b/core/src/test/scala/kafka/tools/LogCompactionTester.scala index 3690856cd90a..4360b2b26681 100755 --- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala +++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala @@ -142,7 +142,7 @@ object LogCompactionTester { try { val topicConfigs = Map(TopicConfig.CLEANUP_POLICY_CONFIG -> TopicConfig.CLEANUP_POLICY_COMPACT) - val newTopics = topics.map(name => new NewTopic(name, 1, 1).configs(topicConfigs.asJava)).asJava + val newTopics = topics.map(name => new NewTopic(name, 1, 1.toShort).configs(topicConfigs.asJava)).asJava adminClient.createTopics(newTopics).all.get var pendingTopics: Seq[String] = Seq() diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index caa7a3be98e2..5db4309d64af 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -32,8 +32,8 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Assert.{assertEquals, assertFalse, assertTrue} -import org.junit.{After, Before, Rule, Test} import org.junit.rules.TestName +import org.junit.{After, Before, Rule, Test} import org.scalatest.Assertions.{fail, intercept} import scala.collection.JavaConverters._ @@ -49,8 +49,13 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs( numConfigs = 6, zkConnect = zkConnect, - rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3" - )).map(KafkaConfig.fromProps) + rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"), + numPartitions = numPartitions, + defaultReplicationFactor = defaultReplicationFactor + ).map(KafkaConfig.fromProps) + + private var numPartitions = 1 + private var defaultReplicationFactor = 1.toShort private var topicService: AdminClientTopicService = _ private var adminClient: JAdminClient = _ @@ -155,6 +160,52 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin adminClient.listTopics().names().get().contains(testTopicName) } + @Test + def testCreateWithDefaults(): Unit = { + createAndWaitTopic(new TopicCommandOptions( + Array("--topic", testTopicName))) + + val partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .all() + .get() + .get(testTopicName) + .partitions() + assertEquals(partitions.size(), numPartitions) + assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor) + } + + @Test + def testCreateWithDefaultReplication(): Unit = { + createAndWaitTopic(new TopicCommandOptions( + Array("--topic", testTopicName, "--partitions", "2"))) + + val partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .all() + .get() + .get(testTopicName) + .partitions() + assertEquals(partitions.size(), 2) + assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor) + } + + @Test + def testCreateWithDefaultPartitions(): Unit = { + createAndWaitTopic(new TopicCommandOptions( + Array("--topic", testTopicName, "--replication-factor", "2"))) + + val partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .all() + .get() + .get(testTopicName) + .partitions() + + assertEquals(partitions.size(), numPartitions) + assertEquals(partitions.get(0).replicas().size(), 2) + } + @Test def testCreateWithConfigs(): Unit = { val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName) @@ -211,7 +262,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testCreateWithNegativeReplicationFactor(): Unit = { - intercept[ExecutionException] { + intercept[IllegalArgumentException] { topicService.createTopic(new TopicCommandOptions( Array("--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName))) } @@ -241,17 +292,17 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testCreateWithNegativePartitionCount(): Unit = { - intercept[ExecutionException] { + intercept[IllegalArgumentException] { topicService.createTopic(new TopicCommandOptions( Array("--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName))) } } @Test - def testCreateWithUnspecifiedPartitionCount(): Unit = { - assertExitCode(1, - () => topicService.createTopic(new TopicCommandOptions( - Array("--replication-factor", "1", "--topic", testTopicName)))) + def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): Unit = { + assertExitCode(1, () => + new TopicCommandOptions(Array("--create", "--zookeeper", "zk", "--topic", testTopicName)).checkArgs() + ) } @Test @@ -281,9 +332,9 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin val topic2 = "kafka.testTopic2" val topic3 = "oooof.testTopic1" adminClient.createTopics( - List(new NewTopic(topic1, 2, 2), - new NewTopic(topic2, 2, 2), - new NewTopic(topic3, 2, 2)).asJavaCollection) + List(new NewTopic(topic1, 2, 2.toShort), + new NewTopic(topic2, 2, 2.toShort), + new NewTopic(topic3, 2, 2.toShort)).asJavaCollection) .all().get() waitForTopicCreated(topic1) waitForTopicCreated(topic2) @@ -301,8 +352,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin def testListTopicsWithExcludeInternal(): Unit = { val topic1 = "kafka.testTopic1" adminClient.createTopics( - List(new NewTopic(topic1, 2, 2), - new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2, 2)).asJavaCollection) + List(new NewTopic(topic1, 2, 2.toShort), + new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2, 2.toShort)).asJavaCollection) .all().get() waitForTopicCreated(topic1) @@ -316,7 +367,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testAlterPartitionCount(): Unit = { adminClient.createTopics( - List(new NewTopic(testTopicName, 2, 2)).asJavaCollection).all().get() + List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get() waitForTopicCreated(testTopicName) topicService.alterTopic(new TopicCommandOptions( @@ -329,7 +380,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testAlterAssignment(): Unit = { adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 2, 2))).all().get() + Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get() waitForTopicCreated(testTopicName) topicService.alterTopic(new TopicCommandOptions( @@ -343,7 +394,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testAlterAssignmentWithMoreAssignmentThanPartitions(): Unit = { adminClient.createTopics( - List(new NewTopic(testTopicName, 2, 2)).asJavaCollection).all().get() + List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get() waitForTopicCreated(testTopicName) intercept[ExecutionException] { @@ -355,7 +406,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testAlterAssignmentWithMorePartitionsThanAssignment(): Unit = { adminClient.createTopics( - List(new NewTopic(testTopicName, 2, 2)).asJavaCollection).all().get() + List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get() waitForTopicCreated(testTopicName) intercept[ExecutionException] { @@ -502,7 +553,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testDescribe(): Unit = { adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 2, 2))).all().get() + Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get() waitForTopicCreated(testTopicName) val output = TestUtils.grabConsoleOutput( @@ -515,7 +566,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testDescribeUnavailablePartitions(): Unit = { adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 6, 1))).all().get() + Collections.singletonList(new NewTopic(testTopicName, 6, 1.toShort))).all().get() waitForTopicCreated(testTopicName) try { @@ -559,7 +610,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @Test def testDescribeUnderReplicatedPartitions(): Unit = { adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, 6))).all().get() + Collections.singletonList(new NewTopic(testTopicName, 1, 6.toShort))).all().get() waitForTopicCreated(testTopicName) try { @@ -581,7 +632,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6") adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, 6).configs(configMap))).all().get() + Collections.singletonList(new NewTopic(testTopicName, 1, 6.toShort).configs(configMap))).all().get() waitForTopicCreated(testTopicName) try { @@ -603,7 +654,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4") adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, 6).configs(configMap))).all().get() + Collections.singletonList(new NewTopic(testTopicName, 1, 6.toShort).configs(configMap))).all().get() waitForTopicCreated(testTopicName) try { @@ -640,8 +691,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin adminClient.createTopics( java.util.Arrays.asList( - new NewTopic(underMinIsrTopic, 1, 6).configs(configMap), - new NewTopic(notUnderMinIsrTopic, 1, 6), + new NewTopic(underMinIsrTopic, 1, 6.toShort).configs(configMap), + new NewTopic(notUnderMinIsrTopic, 1, 6.toShort), new NewTopic(offlineTopic, Collections.singletonMap(0, Collections.singletonList(0))), new NewTopic(fullyReplicatedTopic, Collections.singletonMap(0, java.util.Arrays.asList(1, 2, 3))))).all().get() diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index 514e7aedc470..d0dbd8e5ab9c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -124,8 +124,18 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { else { assertNotNull("The topic should be created", metadataForTopic) assertEquals(Errors.NONE, metadataForTopic.error) - assertEquals("The topic should have the correct number of partitions", partitions, metadataForTopic.partitionMetadata.size) - assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicas.size) + if (partitions == -1) { + assertEquals("The topic should have the default number of partitions", configs.head.numPartitions, metadataForTopic.partitionMetadata.size) + } else { + assertEquals("The topic should have the correct number of partitions", partitions, metadataForTopic.partitionMetadata.size) + } + + if (replication == -1) { + assertEquals("The topic should have the default replication factor", + configs.head.defaultReplicationFactor, metadataForTopic.partitionMetadata.asScala.head.replicas.size) + } else { + assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicas.size) + } } } diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index 709b3c977c03..6d1b771a78a8 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -43,6 +43,13 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { topicReq("topic10", numPartitions = 5, replicationFactor = 2), topicReq("topic11", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))), validateOnly = true)) + // Defaults + validateValidCreateTopicsRequests(topicsReq(Seq( + topicReq("topic12", replicationFactor = -1, numPartitions = -1)))) + validateValidCreateTopicsRequests(topicsReq(Seq( + topicReq("topic13", replicationFactor = 2, numPartitions = -1)))) + validateValidCreateTopicsRequests(topicsReq(Seq( + topicReq("topic14", replicationFactor = -1, numPartitions = 2)))) } @Test @@ -52,7 +59,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { // Basic validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq(existingTopic))), Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic' already exists.")))) - validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))), + validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -2))), Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false) validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication", replicationFactor = brokerCount + 1))), @@ -70,7 +77,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { // Partial validateErrorCreateTopicsRequests(topicsReq(Seq( topicReq(existingTopic), - topicReq("partial-partitions", numPartitions = -1), + topicReq("partial-partitions", numPartitions = -2), topicReq("partial-replication", replicationFactor=brokerCount + 1), topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))), topicReq("partial-none"))), diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala index 0395484cf3d2..74d9892e1ba5 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala @@ -99,9 +99,14 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest Some("Replication factor: 4 larger than available brokers: 3.")))) validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2", - numPartitions = 10, replicationFactor = -1)), validateOnly = true), + numPartitions = 10, replicationFactor = -2)), validateOnly = true), Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("Replication factor must be larger than 0.")))) + + validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", + numPartitions = -2, replicationFactor = 1)), validateOnly = true), + Map("error-partitions" -> error(Errors.INVALID_PARTITIONS, + Some("Number of partitions must be larger than 0.")))) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ea8d2b3e549f..3f9c8c3cfb25 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -173,11 +173,14 @@ object TestUtils extends Logging { enableSaslSsl: Boolean = false, rackInfo: Map[Int, String] = Map(), logDirCount: Int = 1, - enableToken: Boolean = false): Seq[Properties] = { + enableToken: Boolean = false, + numPartitions: Int = 1, + defaultReplicationFactor: Short = 1): Seq[Properties] = { (0 until numConfigs).map { node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort, interBrokerSecurityProtocol, trustStoreFile, saslProperties, enablePlaintext = enablePlaintext, enableSsl = enableSsl, - enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount, enableToken = enableToken) + enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount, enableToken = enableToken, + numPartitions = numPartitions, defaultReplicationFactor = defaultReplicationFactor) } } @@ -229,7 +232,9 @@ object TestUtils extends Logging { saslSslPort: Int = RandomPort, rack: Option[String] = None, logDirCount: Int = 1, - enableToken: Boolean = false): Properties = { + enableToken: Boolean = false, + numPartitions: Int = 1, + defaultReplicationFactor: Short = 1): Properties = { def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol) val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]() @@ -289,6 +294,9 @@ object TestUtils extends Logging { if (enableToken) props.put(KafkaConfig.DelegationTokenMasterKeyProp, "masterkey") + props.put(KafkaConfig.NumPartitionsProp, numPartitions.toString) + props.put(KafkaConfig.DefaultReplicationFactorProp, defaultReplicationFactor.toString) + props } diff --git a/docs/upgrade.html b/docs/upgrade.html index 3c737ec55b67..4f483e6b4f08 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -75,6 +75,7 @@
Notable changes in 2
  • The bin/kafka-preferred-replica-election.sh command line tool has been deprecated. It has been replaced by bin/kafka-leader-election.sh.
  • The methods electPreferredLeaders in the Java AdminClient class have been deprecated in favor of the methods electLeaders.
  • +
  • Scala code leveraging the NewTopic(String, int, short) constructor with literal values will need to explicitly call toShort on the second literal.
Notable changes in 2.3.0
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index c912deff660d..ba26d6cf3422 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -90,6 +90,7 @@ versions += [ rocksDB: "5.18.3", scalafmt: "1.5.1", scalatest: "3.0.7", + scalaJava8Compat : "0.9.0", scoverage: "1.3.1", scoveragePlugin: "2.5.0", shadowPlugin: "4.0.4", @@ -157,6 +158,7 @@ libs += [ scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", scalaReflect: "org.scala-lang:scala-reflect:$versions.scala", scalatest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalatest", + scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat", scoveragePlugin: "org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage", scoverageRuntime: "org.scoverage:scalac-scoverage-runtime_$versions.baseScala:$versions.scoverage", slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j",