Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13399 towards scala3 #11432

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ object ReassignPartitionsCommand extends Logging {
executeAssignment(adminClient,
opts.options.has(opts.additionalOpt),
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
opts.options.valueOf(opts.interBrokerThrottleOpt),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
opts.options.valueOf(opts.timeoutOpt))
opts.options.valueOf(opts.interBrokerThrottleOpt).longValue(),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt).longValue(),
opts.options.valueOf(opts.timeoutOpt).longValue())
} else if (opts.options.has(opts.cancelOpt)) {
cancelAssignment(adminClient,
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
opts.options.has(opts.preserveThrottlesOpt),
opts.options.valueOf(opts.timeoutOpt))
opts.options.valueOf(opts.timeoutOpt).longValue())
Comment on lines +241 to +248
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those numbers were all Ints converted automatically to Long.

} else if (opts.options.has(opts.listOpt)) {
listReassignments(adminClient)
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{Collections, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.log.LogConfig
import kafka.utils._
import kafka.utils.{immutable=> _, _}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes like this one solve a collision between the immutable package in Scala collections and kafka.utils.immutable.

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
import org.apache.kafka.clients.admin.CreateTopicsOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
} else {
val (logConfigs, failed) = zkClient.getLogConfigs(
partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
config.originals()
config.originals
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.originals returns a Map. In Scala 2 the extra parenthesis were ignored, but in Scala 3 they matter and they are used on the returning Map, causing a compilation error. It tries to call apply without any parameter.

)

partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils._
import kafka.utils.{immutable=> _, _}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
}

def overriddenConfigsAsLoggableString: String = {
val overriddenTopicProps = props.asScala.collect {
val overriddenTopicProps: Map[String, Object] = props.asScala.collect {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason Scala 3 was picking Map[AnyRef, AnyRef] instead.

case (k: String, v) if overriddenConfigs.contains(k) => (k, v.asInstanceOf[AnyRef])
}
ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, configDef)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class LogManager(logDirs: Seq[File],
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
}

val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty[File]).filter(logDir =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala 3 wasn't able to infer that Array.empty parametrized type should be File.

logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
val numLogsLoaded = new AtomicInteger(0)
numTotalLogs += logsToLoad.length
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ final class KafkaMetadataLog private (
// Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
topicPartition: TopicPartition,
topicPartitionArg: TopicPartition,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoids a collision between the argument name and the method with the same name.

config: MetadataLogConfig
) extends ReplicatedLog with Logging {

this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=${config.nodeId}] "
this.logIdent = s"[MetadataLog partition=$topicPartitionArg, nodeId=${config.nodeId}] "

override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
val isolation = readIsolation match {
Expand Down Expand Up @@ -223,7 +223,7 @@ final class KafkaMetadataLog private (
* Return the topic partition associated with the log.
*/
override def topicPartition(): TopicPartition = {
topicPartition
topicPartitionArg
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class KafkaNetworkChannel(
request.createdTimeMs,
destination = node,
request = buildRequest(request.data),
handler = onComplete
handler = onComplete(_)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more idiomatic way to pass the function reference. Previous wasn't compiling in Scala 3.

))

case None =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class BrokerServer(
metadataListener.startPublishing(metadataPublisher).get()

// Log static broker configurations.
new KafkaConfig(config.originals(), true)
new KafkaConfig(config.originals, true)

// Enable inbound TCP connections.
socketServer.startProcessingRequests(authorizerFutures)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ class ReplicaManager(val config: KafkaConfig,
protected def createReplicaSelector(): Option[ReplicaSelector] = {
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = CoreUtils.createObject[ReplicaSelector](className)
tmpReplicaSelector.configure(config.originals())
tmpReplicaSelector.configure(config.originals)
tmpReplicaSelector
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.metadata.ZkConfigRepository
import kafka.utils._
import kafka.utils.{immutable => _, _}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
Expand All @@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer

import scala.collection.{Map, mutable, _}
import scala.collection.{Map, _}
import scala.jdk.CollectionConverters._

class ZkAdminManager(val config: KafkaConfig,
Expand Down Expand Up @@ -759,7 +759,7 @@ class ZkAdminManager(val config: KafkaConfig,
props.setProperty(op.key, value.toString)
case ConfigDef.Type.LONG | ConfigDef.Type.INT =>
val epsilon = 1e-6
val intValue = if (key.`type` == ConfigDef.Type.LONG)
val intValue: Long = if (key.`type` == ConfigDef.Type.LONG)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an automatic widening in Scala 2, now needs to be done manually.

(value + epsilon).toLong
else
(value + epsilon).toInt
Expand Down
32 changes: 16 additions & 16 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.typesafe.scalalogging.LazyLogging
import joptsimple._
import kafka.utils.Implicits._
import kafka.utils.{Exit, _}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig => ClientConsumerConfig, ConsumerRecord, KafkaConsumer}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids a name clashing between the class ConsumerConfig under o.a.k.c.c. with the one defined in the same class.

import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
import org.apache.kafka.common.record.TimestampType
Expand Down Expand Up @@ -148,11 +148,11 @@ object ConsoleConsumer extends Logging {
props ++= config.consumerProps
props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer")
props.put(ClientConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
if (props.getProperty(ClientConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ClientConsumerConfig.CLIENT_ID_CONFIG, "console-consumer")
CommandLineUtils.maybeMergeOptions(
props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
props, ClientConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
props
}

Expand All @@ -170,9 +170,9 @@ object ConsoleConsumer extends Logging {
def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties): Unit = {
val (earliestConfigValue, latestConfigValue) = ("earliest", "latest")

if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
if (props.containsKey(ClientConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
// auto.offset.reset parameter was specified on the command line
val autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val autoResetOption = props.getProperty(ClientConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
if (config.options.has(config.resetBeginningOpt) && earliestConfigValue != autoResetOption) {
// conflicting options - latest und earliest, throw an error
System.err.println(s"Can't simultaneously specify --from-beginning and 'auto.offset.reset=$autoResetOption', " +
Expand All @@ -185,7 +185,7 @@ object ConsoleConsumer extends Logging {
// no explicit value for auto.offset.reset was specified
// if --from-beginning was specified use earliest, otherwise default to latest
val autoResetOption = if (config.options.has(config.resetBeginningOpt)) earliestConfigValue else latestConfigValue
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption)
props.put(ClientConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption)
}
}

Expand Down Expand Up @@ -316,10 +316,10 @@ object ConsoleConsumer extends Logging {
val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter]

if (keyDeserializer != null && keyDeserializer.nonEmpty) {
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
formatterArgs.setProperty(ClientConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
}
if (valueDeserializer != null && valueDeserializer.nonEmpty) {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
formatterArgs.setProperty(ClientConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
}

formatter.configure(formatterArgs.asScala.asJava)
Expand Down Expand Up @@ -372,8 +372,8 @@ object ConsoleConsumer extends Logging {
// if the group id is provided in more than place (through different means) all values must be the same
val groupIdsProvided = Set(
Option(options.valueOf(groupIdOpt)), // via --group
Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property
Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config
Option(consumerProps.get(ClientConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property
Option(extraConsumerProps.get(ClientConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config
).flatten

if (groupIdsProvided.size > 1) {
Expand All @@ -384,13 +384,13 @@ object ConsoleConsumer extends Logging {

groupIdsProvided.headOption match {
case Some(group) =>
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
consumerProps.put(ClientConsumerConfig.GROUP_ID_CONFIG, group)
case None =>
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
consumerProps.put(ClientConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
// By default, avoid unnecessary expansion of the coordinator cache since
// the auto-generated group and its offsets is not intended to be used again
if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
if (!consumerProps.containsKey(ClientConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
consumerProps.put(ClientConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
groupIdPassed = false
}

Expand Down
38 changes: 19 additions & 19 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.message._
import kafka.utils.Implicits._
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig => ClientProducerConfig, ProducerRecord}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids a name clashing between the class ProducerConfig under o.a.k.c.p. with the one defined in the same class.

import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.utils.Utils
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -86,39 +86,39 @@ object ConsoleProducer {
props ++= config.extraProducerProps

if (config.bootstrapServer != null)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ClientProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
else
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ClientProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)

props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ClientProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
if (props.getProperty(ClientProducerConfig.CLIENT_ID_CONFIG) == null)
props.put(ClientProducerConfig.CLIENT_ID_CONFIG, "console-producer")
props.put(ClientProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ClientProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")

CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.LINGER_MS_CONFIG, config.options, config.sendTimeoutOpt)
props, ClientProducerConfig.LINGER_MS_CONFIG, config.options, config.sendTimeoutOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.ACKS_CONFIG, config.options, config.requestRequiredAcksOpt)
props, ClientProducerConfig.ACKS_CONFIG, config.options, config.requestRequiredAcksOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.options, config.requestTimeoutMsOpt)
props, ClientProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.options, config.requestTimeoutMsOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.RETRIES_CONFIG, config.options, config.messageSendMaxRetriesOpt)
props, ClientProducerConfig.RETRIES_CONFIG, config.options, config.messageSendMaxRetriesOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.options, config.retryBackoffMsOpt)
props, ClientProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.options, config.retryBackoffMsOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.SEND_BUFFER_CONFIG, config.options, config.socketBufferSizeOpt)
props, ClientProducerConfig.SEND_BUFFER_CONFIG, config.options, config.socketBufferSizeOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options, config.maxMemoryBytesOpt)
props, ClientProducerConfig.BUFFER_MEMORY_CONFIG, config.options, config.maxMemoryBytesOpt)
// We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.BATCH_SIZE_CONFIG, config.options, config.batchSizeOpt)
props, ClientProducerConfig.BATCH_SIZE_CONFIG, config.options, config.batchSizeOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.BATCH_SIZE_CONFIG, config.options, config.maxPartitionMemoryBytesOpt)
props, ClientProducerConfig.BATCH_SIZE_CONFIG, config.options, config.maxPartitionMemoryBytesOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.METADATA_MAX_AGE_CONFIG, config.options, config.metadataExpiryMsOpt)
props, ClientProducerConfig.METADATA_MAX_AGE_CONFIG, config.options, config.metadataExpiryMsOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.MAX_BLOCK_MS_CONFIG, config.options, config.maxBlockMsOpt)
props, ClientProducerConfig.MAX_BLOCK_MS_CONFIG, config.options, config.maxBlockMsOpt)

props
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/utils/json/DecodeJson.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ object DecodeJson {
else decodeJson.decodeEither(node).map(Some(_))
}

implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => {
implicit def decodeSeq[E, S[E] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only way I could have it work. It seems this definition should have always been like this, but maybe I'm missing a use case.
The previous definition was never met in Scala 3.

if (node.isArray)
decodeIterator(node.elements.asScala)(decodeJson.decodeEither)
else Left(s"Expected JSON array, received $node")
}

implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = (node: JsonNode) => {
implicit def decodeMap[V, M[K, V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = (node: JsonNode) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this just in case, but there is no test code using this.

if (node.isObject)
decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).map(v => (e.getKey, v)))
else Left(s"Expected JSON object, received $node")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2425,7 +2425,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(quorum: String): Unit = {
removeAllClientAcls()

for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion.toInt to ApiKeys.DESCRIBE_CLUSTER.latestVersion.toInt) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala 2, these Shorts were converted to RichInt to have the to method. Now it needs to be done manually.

testDescribeClusterClusterAuthorizedOperations(version.toShort, 0)
}
}
Expand All @@ -2445,7 +2445,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val expectedClusterAuthorizedOperations = Utils.to32BitField(
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)

for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion.toInt to ApiKeys.DESCRIBE_CLUSTER.latestVersion.toInt) {
testDescribeClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.{Cluster, Reconfigurable}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.server.quota._
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType, ClientQuotaEntity}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
val topicResource = new ResourcePattern(ResourceType.TOPIC, AclEntry.WildcardResource, PatternType.LITERAL)

try {
authorizer.configure(this.configs.head.originals())
authorizer.configure(this.configs.head.originals)
val result = authorizer.createAcls(null, List(
new AclBinding(clusterResource, accessControlEntry(
JaasTestUtils.KafkaServerPrincipalUnqualifiedName, CLUSTER_ACTION)),
Expand Down
Loading