From 10937f305c44b92229d177f7042918ebc12e34c1 Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Wed, 13 Dec 2017 16:21:23 +0530 Subject: [PATCH 1/7] Initial Commit --- core/src/main/scala/kafka/consumer/TopicCount.scala | 2 +- core/src/main/scala/kafka/utils/Json.scala | 13 +++++++++++-- core/src/test/scala/unit/kafka/utils/JsonTest.scala | 6 +++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index 2cabcaea0b74..2ddc8986cea1 100755 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -64,7 +64,7 @@ private[kafka] object TopicCount extends Logging { var subscriptionPattern: String = null var topMap: Map[String, Int] = null try { - Json.parseFull(topicCountString) match { + Json.parseFullIncludingACLs(topicCountString) match { case Some(js) => val consumerRegistrationMap = js.asJsonObject consumerRegistrationMap.get("pattern") match { diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index c61e1490448b..46cf1938bb18 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -31,9 +31,18 @@ object Json { private val mapper = new ObjectMapper() /** - * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. - */ + * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. + */ def parseFull(input: String): Option[JsonValue] = + try Option(mapper.readTree(input)).map(JsonValue(_)) + catch { case _: JsonProcessingException => None } + + + /** + * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. This method is currently used + * to read the already stored invalid ACLs JSON which was persisted using older versions of Kafka (prior to Kafka 1.1.0). KAFKA-6319 + */ + def parseFullIncludingACLs(input: String): Option[JsonValue] = try Option(mapper.readTree(input)).map(JsonValue(_)) catch { case _: JsonProcessingException => diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index 209fdee2edc3..a366c910ec4e 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -57,12 +57,12 @@ class JsonTest { // Test with encoder that properly escapes backslash and quotes val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") val encoded = Json.legacyEncodeAsString(map) - val decoded = Json.parseFull(encoded) - assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) + val decoded = Json.parseFullIncludingACLs(encoded) + assertEquals(Json.parseFullIncludingACLs("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) // Test strings with non-escaped backslash and quotes. This is to verify that ACLs // containing non-escaped chars persisted using 1.0 can be parsed. - assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""")) + assertEquals(decoded, Json.parseFullIncludingACLs("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""")) } @Test From 340bb7b46f10ab92f5d40eda0117d3597d9f370f Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Tue, 9 Jan 2018 14:51:02 +0530 Subject: [PATCH 2/7] Fixed the applicability of new method based on trunk --- core/src/main/scala/kafka/consumer/TopicCount.scala | 2 +- core/src/main/scala/kafka/security/auth/Acl.scala | 2 +- core/src/main/scala/kafka/utils/Json.scala | 6 ++++-- core/src/test/scala/unit/kafka/utils/JsonTest.scala | 6 +++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index 2ddc8986cea1..2cabcaea0b74 100755 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -64,7 +64,7 @@ private[kafka] object TopicCount extends Logging { var subscriptionPattern: String = null var topMap: Map[String, Int] = null try { - Json.parseFullIncludingACLs(topicCountString) match { + Json.parseFull(topicCountString) match { case Some(js) => val consumerRegistrationMap = js.asJsonObject consumerRegistrationMap.get("pattern") match { diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index 67f3d9592f2f..040c7c7cc178 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -58,7 +58,7 @@ object Acl { if (bytes == null || bytes.isEmpty) return collection.immutable.Set.empty[Acl] - Json.parseBytes(bytes).map(_.asJsonObject).map { js => + Json.parseBytesIncludingACLs(bytes).map(_.asJsonObject).map { js => //the acl json version. require(js(VersionKey).to[Int] == CurrentVersion) js(AclsKey).asJsonArray.iterator.map(_.asJsonObject).map { itemJs => diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index 46cf1938bb18..e12d69a7c9a2 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -16,6 +16,8 @@ */ package kafka.utils +import java.nio.charset.StandardCharsets + import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import kafka.utils.json.JsonValue @@ -42,7 +44,7 @@ object Json { * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. This method is currently used * to read the already stored invalid ACLs JSON which was persisted using older versions of Kafka (prior to Kafka 1.1.0). KAFKA-6319 */ - def parseFullIncludingACLs(input: String): Option[JsonValue] = + def parseBytesIncludingACLs(input: Array[Byte]): Option[JsonValue] = try Option(mapper.readTree(input)).map(JsonValue(_)) catch { case _: JsonProcessingException => @@ -50,7 +52,7 @@ object Json { // stored in ACLs may contain backslash as an escape char, making the JSON generated in earlier versions invalid. // Escape backslash and retry to handle these strings which may have been persisted in ZK. // Note that this does not handle all special characters (e.g. non-escaped double quotes are not supported) - val escapedInput = input.replaceAll("\\\\", "\\\\\\\\") + val escapedInput = new String(input, StandardCharsets.UTF_8).replaceAll("\\\\", "\\\\\\\\") try Option(mapper.readTree(escapedInput)).map(JsonValue(_)) catch { case _: JsonProcessingException => None } } diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index a366c910ec4e..209fdee2edc3 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -57,12 +57,12 @@ class JsonTest { // Test with encoder that properly escapes backslash and quotes val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") val encoded = Json.legacyEncodeAsString(map) - val decoded = Json.parseFullIncludingACLs(encoded) - assertEquals(Json.parseFullIncludingACLs("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) + val decoded = Json.parseFull(encoded) + assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) // Test strings with non-escaped backslash and quotes. This is to verify that ACLs // containing non-escaped chars persisted using 1.0 can be parsed. - assertEquals(decoded, Json.parseFullIncludingACLs("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""")) + assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""")) } @Test From 60dd808af99ef2beaf698b5b252182d6d3584f69 Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Tue, 9 Jan 2018 20:00:46 +0530 Subject: [PATCH 3/7] Merge branch 'trunk' --- core/src/test/scala/unit/kafka/utils/JsonTest.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index 209fdee2edc3..a1f4b0c96079 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -54,6 +54,10 @@ class JsonTest { Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add) assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode))) + } + + @Test + def testJsonParseWithBackslashEscaping() = { // Test with encoder that properly escapes backslash and quotes val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") val encoded = Json.legacyEncodeAsString(map) @@ -62,7 +66,7 @@ class JsonTest { // Test strings with non-escaped backslash and quotes. This is to verify that ACLs // containing non-escaped chars persisted using 1.0 can be parsed. - assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""")) + assertEquals(decoded, Json.parseBytesIncludingACLs("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""".getBytes())) } @Test From 73d11986fd962e4d5fd76542098609f6cbfea9d7 Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Mon, 15 Jan 2018 17:13:34 +0530 Subject: [PATCH 4/7] Rebased and applied review comments --- .../main/scala/kafka/security/auth/Acl.scala | 27 +++++++++++++++++-- core/src/main/scala/kafka/utils/Json.scala | 18 ------------- .../unit/kafka/security/auth/AclTest.scala | 13 +++++++++ .../scala/unit/kafka/utils/JsonTest.scala | 13 --------- 4 files changed, 38 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index 040c7c7cc178..2bc5846711f9 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -17,9 +17,15 @@ package kafka.security.auth +import java.nio.charset.StandardCharsets + +import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.databind.ObjectMapper import kafka.utils.Json +import kafka.utils.json.JsonValue import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.SecurityUtils + import scala.collection.JavaConverters._ object Acl { @@ -33,6 +39,7 @@ object Acl { val VersionKey = "version" val CurrentVersion = 1 val AclsKey = "acls" + private val mapper = new ObjectMapper() /** * @@ -58,7 +65,7 @@ object Acl { if (bytes == null || bytes.isEmpty) return collection.immutable.Set.empty[Acl] - Json.parseBytesIncludingACLs(bytes).map(_.asJsonObject).map { js => + tryParseBytesIncludingACLs(bytes).right.map(_.asJsonObject).right.map { js => //the acl json version. require(js(VersionKey).to[Int] == CurrentVersion) js(AclsKey).asJsonArray.iterator.map(_.asJsonObject).map { itemJs => @@ -68,12 +75,28 @@ object Acl { val operation = Operation.fromString(itemJs(OperationKey).to[String]) new Acl(principal, permissionType, host, operation) }.toSet - }.getOrElse(Set.empty) + }.right.getOrElse(Set.empty) } def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = { Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap.asJava).toList.asJava) } + + /** + * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. This method is currently used + * to read the already stored invalid ACLs JSON which was persisted using older versions of Kafka (prior to Kafka 1.1.0). KAFKA-6319 + */ + private def tryParseBytesIncludingACLs(input: Array[Byte]): Either[JsonProcessingException, JsonValue] = + try Right(mapper.readTree(input)).right.map(JsonValue(_)) + catch { + case _: JsonProcessingException => + // Before 1.0.1, Json#encode did not escape backslash or any other special characters. SSL principals + // stored in ACLs may contain backslash as an escape char, making the JSON generated in earlier versions invalid. + // Escape backslash and retry to handle these strings which may have been persisted in ZK. + // Note that this does not handle all special characters (e.g. non-escaped double quotes are not supported) + val escapedInput = new String(input, StandardCharsets.UTF_8).replaceAll("\\\\", "\\\\\\\\") + Json.tryParseBytes(escapedInput.getBytes(StandardCharsets.UTF_8)) + } } /** diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index e12d69a7c9a2..f3d28af41ddf 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -39,24 +39,6 @@ object Json { try Option(mapper.readTree(input)).map(JsonValue(_)) catch { case _: JsonProcessingException => None } - - /** - * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. This method is currently used - * to read the already stored invalid ACLs JSON which was persisted using older versions of Kafka (prior to Kafka 1.1.0). KAFKA-6319 - */ - def parseBytesIncludingACLs(input: Array[Byte]): Option[JsonValue] = - try Option(mapper.readTree(input)).map(JsonValue(_)) - catch { - case _: JsonProcessingException => - // Before 1.0.1, Json#encode did not escape backslash or any other special characters. SSL principals - // stored in ACLs may contain backslash as an escape char, making the JSON generated in earlier versions invalid. - // Escape backslash and retry to handle these strings which may have been persisted in ZK. - // Note that this does not handle all special characters (e.g. non-escaped double quotes are not supported) - val escapedInput = new String(input, StandardCharsets.UTF_8).replaceAll("\\\\", "\\\\\\\\") - try Option(mapper.readTree(escapedInput)).map(JsonValue(_)) - catch { case _: JsonProcessingException => None } - } - /** * Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of * exception. diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala index beeac3766538..6f780276e07a 100644 --- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -18,11 +18,15 @@ package kafka.security.auth import java.nio.charset.StandardCharsets.UTF_8 +import com.fasterxml.jackson.core.JsonProcessingException import kafka.utils.Json import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.junit.Assert.assertEquals import org.junit.{Assert, Test} import org.scalatest.junit.JUnitSuite + import scala.collection.JavaConverters._ +import scala.collection.Map class AclTest extends JUnitSuite { @@ -43,4 +47,13 @@ class AclTest extends JUnitSuite { Assert.assertEquals(acls, Acl.fromBytes(AclJson.getBytes(UTF_8))) } + @Test + def testJsonParseWithBackslashEscaping() = { + // Test with encoder that properly escapes backslash and quotes + val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") + val encoded = Json.legacyEncodeAsString(map) + val decoded = Json.parseFull(encoded) + assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) + } + } diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index a1f4b0c96079..c8936f4dc926 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -56,19 +56,6 @@ class JsonTest { } - @Test - def testJsonParseWithBackslashEscaping() = { - // Test with encoder that properly escapes backslash and quotes - val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") - val encoded = Json.legacyEncodeAsString(map) - val decoded = Json.parseFull(encoded) - assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) - - // Test strings with non-escaped backslash and quotes. This is to verify that ACLs - // containing non-escaped chars persisted using 1.0 can be parsed. - assertEquals(decoded, Json.parseBytesIncludingACLs("""{"foo1":"bar1\,bar2", "foo2":"\bar"}""".getBytes())) - } - @Test def testLegacyEncodeAsString() { assertEquals("null", Json.legacyEncodeAsString(null)) From d97b30880d58d1524d4b483455ca53187d3c8d97 Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Mon, 22 Jan 2018 09:17:23 +0530 Subject: [PATCH 5/7] Rebased and applied review comments --- .../main/scala/kafka/security/auth/Acl.scala | 26 +++++++++---------- .../unit/kafka/security/auth/AclTest.scala | 21 +++++---------- .../scala/unit/kafka/utils/JsonTest.scala | 5 ++++ 3 files changed, 23 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index 2bc5846711f9..97c8c27665b2 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -19,8 +19,6 @@ package kafka.security.auth import java.nio.charset.StandardCharsets -import com.fasterxml.jackson.core.JsonProcessingException -import com.fasterxml.jackson.databind.ObjectMapper import kafka.utils.Json import kafka.utils.json.JsonValue import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -39,7 +37,6 @@ object Acl { val VersionKey = "version" val CurrentVersion = 1 val AclsKey = "acls" - private val mapper = new ObjectMapper() /** * @@ -65,7 +62,7 @@ object Acl { if (bytes == null || bytes.isEmpty) return collection.immutable.Set.empty[Acl] - tryParseBytesIncludingACLs(bytes).right.map(_.asJsonObject).right.map { js => + parseBytesWithAclFallback(bytes).map(_.asJsonObject).map { js => //the acl json version. require(js(VersionKey).to[Int] == CurrentVersion) js(AclsKey).asJsonArray.iterator.map(_.asJsonObject).map { itemJs => @@ -75,7 +72,7 @@ object Acl { val operation = Operation.fromString(itemJs(OperationKey).to[String]) new Acl(principal, permissionType, host, operation) }.toSet - }.right.getOrElse(Set.empty) + }.getOrElse(Set.empty) } def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = { @@ -86,17 +83,18 @@ object Acl { * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. This method is currently used * to read the already stored invalid ACLs JSON which was persisted using older versions of Kafka (prior to Kafka 1.1.0). KAFKA-6319 */ - private def tryParseBytesIncludingACLs(input: Array[Byte]): Either[JsonProcessingException, JsonValue] = - try Right(mapper.readTree(input)).right.map(JsonValue(_)) - catch { - case _: JsonProcessingException => - // Before 1.0.1, Json#encode did not escape backslash or any other special characters. SSL principals - // stored in ACLs may contain backslash as an escape char, making the JSON generated in earlier versions invalid. - // Escape backslash and retry to handle these strings which may have been persisted in ZK. - // Note that this does not handle all special characters (e.g. non-escaped double quotes are not supported) + private def parseBytesWithAclFallback(input: Array[Byte]): Option[JsonValue] = { + // Before 1.0.1, Json#encode did not escape backslash or any other special characters. SSL principals + // stored in ACLs may contain backslash as an escape char, making the JSON generated in earlier versions invalid. + // Escape backslash and retry to handle these strings which may have been persisted in ZK. + // Note that this does not handle all special characters (e.g. non-escaped double quotes are not supported) + Json.tryParseBytes(input) match { + case Left(e) => val escapedInput = new String(input, StandardCharsets.UTF_8).replaceAll("\\\\", "\\\\\\\\") - Json.tryParseBytes(escapedInput.getBytes(StandardCharsets.UTF_8)) + Json.parseFull(escapedInput) + case Right(v) => Some(v) } + } } /** diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala index 6f780276e07a..f5985a8203b3 100644 --- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -18,42 +18,33 @@ package kafka.security.auth import java.nio.charset.StandardCharsets.UTF_8 -import com.fasterxml.jackson.core.JsonProcessingException import kafka.utils.Json import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.junit.Assert.assertEquals import org.junit.{Assert, Test} import org.scalatest.junit.JUnitSuite import scala.collection.JavaConverters._ -import scala.collection.Map class AclTest extends JUnitSuite { val AclJson = "{\"version\": 1, \"acls\": [{\"host\": \"host1\",\"permissionType\": \"Deny\",\"operation\": \"READ\", \"principal\": \"User:alice\" }, " + "{ \"host\": \"*\" , \"permissionType\": \"Allow\", \"operation\": \"Read\", \"principal\": \"User:bob\" }, " + - "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob\"} ]}" + "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob\"}, " + + "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:\\\\bob\"}, " + + "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob1\\\\bob2\"}]}" @Test def testAclJsonConversion(): Unit = { val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read) val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read) val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read) + val acl4 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "\\bob"), Deny, "host1", Read) + val acl5 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob1\\bob2"), Deny, "host1", Read) - val acls = Set[Acl](acl1, acl2, acl3) + val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5) val jsonAcls = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava) Assert.assertEquals(acls, Acl.fromBytes(jsonAcls)) Assert.assertEquals(acls, Acl.fromBytes(AclJson.getBytes(UTF_8))) } - - @Test - def testJsonParseWithBackslashEscaping() = { - // Test with encoder that properly escapes backslash and quotes - val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") - val encoded = Json.legacyEncodeAsString(map) - val decoded = Json.parseFull(encoded) - assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) - } - } diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index c8936f4dc926..c896d611bf2a 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -54,6 +54,11 @@ class JsonTest { Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add) assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode))) + // Test with encoder that properly escapes backslash and quotes + val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") + val encoded = Json.legacyEncodeAsString(map) + val decoded = Json.parseFull(encoded) + assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) } @Test From 281f50807ab8e31f20e9c6c85b8775064c9b11e9 Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Wed, 24 Jan 2018 10:18:17 +0530 Subject: [PATCH 6/7] Changed the AclJson to have the incorrect format for principals --- core/src/test/scala/unit/kafka/security/auth/AclTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala index f5985a8203b3..2b5169cbd52f 100644 --- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -30,8 +30,8 @@ class AclTest extends JUnitSuite { val AclJson = "{\"version\": 1, \"acls\": [{\"host\": \"host1\",\"permissionType\": \"Deny\",\"operation\": \"READ\", \"principal\": \"User:alice\" }, " + "{ \"host\": \"*\" , \"permissionType\": \"Allow\", \"operation\": \"Read\", \"principal\": \"User:bob\" }, " + "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob\"}, " + - "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:\\\\bob\"}, " + - "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob1\\\\bob2\"}]}" + "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:\\bob\"}, " + + "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob1\\bob2\"}]}" @Test def testAclJsonConversion(): Unit = { From f166c0b008fab1e5ec6854290bc043b19c6ca05b Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Tue, 5 Jun 2018 14:25:43 +0530 Subject: [PATCH 7/7] Applied the review comments --- core/src/main/scala/kafka/utils/Json.scala | 6 ++---- .../test/scala/unit/kafka/security/auth/AclTest.scala | 10 +++++----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index f3d28af41ddf..9faf40eb4e96 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -16,8 +16,6 @@ */ package kafka.utils -import java.nio.charset.StandardCharsets - import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import kafka.utils.json.JsonValue @@ -33,8 +31,8 @@ object Json { private val mapper = new ObjectMapper() /** - * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. - */ + * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. + */ def parseFull(input: String): Option[JsonValue] = try Option(mapper.readTree(input)).map(JsonValue(_)) catch { case _: JsonProcessingException => None } diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala index 2b5169cbd52f..28d0b81f280f 100644 --- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -27,11 +27,11 @@ import scala.collection.JavaConverters._ class AclTest extends JUnitSuite { - val AclJson = "{\"version\": 1, \"acls\": [{\"host\": \"host1\",\"permissionType\": \"Deny\",\"operation\": \"READ\", \"principal\": \"User:alice\" }, " + - "{ \"host\": \"*\" , \"permissionType\": \"Allow\", \"operation\": \"Read\", \"principal\": \"User:bob\" }, " + - "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob\"}, " + - "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:\\bob\"}, " + - "{ \"host\": \"host1\", \"permissionType\": \"Deny\", \"operation\": \"Read\" , \"principal\": \"User:bob1\\bob2\"}]}" + val AclJson = """{"version": 1, "acls": [{"host": "host1","permissionType": "Deny","operation": "READ", "principal": "User:alice" }, + { "host": "*" , "permissionType": "Allow", "operation": "Read", "principal": "User:bob" }, + { "host": "host1", "permissionType": "Deny", "operation": "Read" , "principal": "User:bob"}, + { "host": "host1", "permissionType": "Deny", "operation": "Read" , "principal": "User:\\bob"}, + { "host": "host1", "permissionType": "Deny", "operation": "Read" , "principal": "User:bob1\\bob2"}]}""" @Test def testAclJsonConversion(): Unit = {