From 43576562d3def25c913ffdc2371a0c6c937a6eb4 Mon Sep 17 00:00:00 2001 From: belugabehr <12578579+belugabehr@users.noreply.github.com> Date: Thu, 30 Jan 2020 07:58:53 -0500 Subject: [PATCH] KAFKA-9408: Use StandardCharsets.UTF-8 instead of "UTF-8" (#7940) Reviewers: Ron Dagostino , Ismael Juma --- core/src/main/scala/kafka/api/ApiUtils.scala | 9 ++++----- .../scala/integration/kafka/api/TransactionsTest.scala | 9 +++++---- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 3 ++- .../test/scala/unit/kafka/network/SocketServerTest.scala | 3 ++- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala index 1644358f7bb3..9be1e4bb8746 100644 --- a/core/src/main/scala/kafka/api/ApiUtils.scala +++ b/core/src/main/scala/kafka/api/ApiUtils.scala @@ -17,6 +17,7 @@ package kafka.api import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import org.apache.kafka.common.KafkaException @@ -24,8 +25,6 @@ import org.apache.kafka.common.KafkaException * Helper functions specific to parsing or serializing requests and responses */ object ApiUtils { - - val ProtocolEncoding = "UTF-8" /** * Read size prefixed string where the size is stored as a 2 byte short. @@ -37,7 +36,7 @@ object ApiUtils { return null val bytes = new Array[Byte](size) buffer.get(bytes) - new String(bytes, ProtocolEncoding) + new String(bytes, StandardCharsets.UTF_8) } /** @@ -49,7 +48,7 @@ object ApiUtils { if(string == null) { buffer.putShort(-1) } else { - val encodedString = string.getBytes(ProtocolEncoding) + val encodedString = string.getBytes(StandardCharsets.UTF_8) if(encodedString.length > Short.MaxValue) { throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") } else { @@ -67,7 +66,7 @@ object ApiUtils { if(string == null) { 2 } else { - val encodedString = string.getBytes(ProtocolEncoding) + val encodedString = string.getBytes(StandardCharsets.UTF_8) if(encodedString.length > Short.MaxValue) { throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") } else { diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 3565a5a123f0..2ea9f7cb7fbc 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -18,6 +18,7 @@ package kafka.api import java.lang.{Long => JLong} +import java.nio.charset.StandardCharsets import java.time.Duration import java.util.{Optional, Properties} import java.util.concurrent.TimeUnit @@ -271,8 +272,8 @@ class TransactionsTest extends KafkaServerTestHarness { shouldCommit = !shouldCommit records.foreach { record => - val key = new String(record.key(), "UTF-8") - val value = new String(record.value(), "UTF-8") + val key = new String(record.key(), StandardCharsets.UTF_8) + val value = new String(record.value(), StandardCharsets.UTF_8) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit)) } @@ -280,11 +281,11 @@ class TransactionsTest extends KafkaServerTestHarness { if (shouldCommit) { producer.commitTransaction() recordsProcessed += records.size - debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + + debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), StandardCharsets.UTF_8)}. Num " + s"records written to $topic2: $recordsProcessed") } else { producer.abortTransaction() - debug(s"aborted transaction Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + + debug(s"aborted transaction Last committed record: ${new String(records.last.value(), StandardCharsets.UTF_8)}. Num " + s"records written to $topic2: $recordsProcessed") TestUtils.resetToCommittedPositions(consumer) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 644822cc4f45..9c0108eb75bc 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io.{File, RandomAccessFile} import java.nio._ +import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Properties import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -1730,7 +1731,7 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { var lastOffset = -1L private def keyFor(key: ByteBuffer) = - new String(Utils.readBytes(key.duplicate), "UTF-8") + new String(Utils.readBytes(key.duplicate), StandardCharsets.UTF_8) override def put(key: ByteBuffer, offset: Long): Unit = { lastOffset = offset diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7c77a979dbf8..91fce5b23c3d 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -21,6 +21,7 @@ import java.io._ import java.net._ import java.nio.ByteBuffer import java.nio.channels.{SelectionKey, SocketChannel} +import java.nio.charset.StandardCharsets import java.util import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, Executors, TimeUnit} import java.util.{HashMap, Properties, Random} @@ -920,7 +921,7 @@ class SocketServerTest { receiveResponse(socket) // now send credentials - val authBytes = "admin\u0000admin\u0000admin-secret".getBytes("UTF-8") + val authBytes = "admin\u0000admin\u0000admin-secret".getBytes(StandardCharsets.UTF_8) if (leverageKip152SaslAuthenticateRequest) { // send credentials within a SaslAuthenticateRequest val saslAuthenticateRequest = new SaslAuthenticateRequest.Builder(new SaslAuthenticateRequestData()