Skip to content

Commit

Permalink
KAFKA-13399 Workaround for SAM conversion with overloading.
Browse files Browse the repository at this point in the history
This is a reported bug that unfortunately can't be fixed easily without breakage on Scala's side. For further information check scala/scala3#13549
  • Loading branch information
jlprat committed Apr 6, 2022
1 parent a001b8a commit ddd25c6
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try a newCount which would be a decrease
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(1)).asJava, option)

var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
var e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidPartitionsException when newCount is a decrease")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
assertEquals("Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage, desc)
Expand All @@ -496,7 +498,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try a newCount which would be a noop (without assignment)
alterResult = client.createPartitions(Map(topic2 ->
NewPartitions.increaseTo(3)).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic2).get
()
},
() => s"$desc: Expect InvalidPartitionsException when requesting a noop")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
Expand All @@ -522,15 +527,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val unknownTopic = "an-unknown-topic"
alterResult = client.createPartitions(Map(unknownTopic ->
NewPartitions.increaseTo(2)).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(unknownTopic).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(unknownTopic).get
()
},
() => s"$desc: Expect InvalidTopicException when using an unknown topic")
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], desc)
assertEquals("The topic 'an-unknown-topic' does not exist.", e.getCause.getMessage, desc)

// try an invalid newCount
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(-22)).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidPartitionsException when newCount is invalid")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
assertEquals("Topic currently has 3 partitions, which is higher than the requested -22.", e.getCause.getMessage,
Expand All @@ -540,7 +551,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try assignments where the number of brokers != replication factor
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
Expand All @@ -551,7 +565,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try #assignments < with the increase
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(6, asList(asList(1)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Increasing the number of partitions by 3 but 1 assignments provided.", e.getCause.getMessage, desc)
Expand All @@ -560,7 +577,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try #assignments > with the increase
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage, desc)
Expand All @@ -569,7 +589,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try with duplicate brokers in assignments
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.",
Expand All @@ -579,7 +602,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try assignments with differently sized inner lists
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
Expand All @@ -589,7 +615,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try assignments with unknown brokers
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4, asList(asList(12)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage, desc)
Expand All @@ -598,7 +627,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// try with empty assignments
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4, Collections.emptyList())).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals("Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage, desc)
Expand All @@ -622,7 +654,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
deleteResult.topicNameValues.get(topic1).get
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4)).asJava, validateOnly)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
e = assertThrows(classOf[ExecutionException], () => {
alterResult.values.get(topic1).get
()
},
() => "Expect InvalidTopicException when the topic is queued for deletion")
assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue)
assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue)
// send invalid record
assertThrows(classOf[Throwable], () => testProducer.send(null), () => "Should not allow sending a null record")
assertThrows(classOf[Throwable], () => {
testProducer.send(null)
()
}, () => "Should not allow sending a null record")
assertEquals(1, MockProducerInterceptor.ON_ERROR_COUNT.intValue, "Interceptor should be notified about exception")
assertEquals(0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), "Interceptor should not receive metadata with an exception when record is null")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ class ReassignPartitionsCommandArgsTest {
}

def shouldFailWith(msg: String, args: Array[String]): Unit = {
val e = assertThrows(classOf[Exception], () => ReassignPartitionsCommand.validateAndParseArgs(args),
val e = assertThrows(classOf[Exception], () => {
ReassignPartitionsCommand.validateAndParseArgs(args)
()
},
() => s"Should have failed with [$msg] but no failure occurred.")
assertTrue(e.getMessage.startsWith(msg), s"Expected exception with message:\n[$msg]\nbut was\n[${e.getMessage}]")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,28 @@ class ReassignPartitionsUnitTest {
@Test
def testParseGenerateAssignmentArgs(): Unit = {
assertStartsWith("Broker list contains duplicate entries",
assertThrows(classOf[AdminCommandFailedException], () => parseGenerateAssignmentArgs(
"""{"topics": [{"topic": "foo"}], "version":1}""", "1,1,2"),
assertThrows(classOf[AdminCommandFailedException], () => {
parseGenerateAssignmentArgs(
"""{"topics": [{"topic": "foo"}], "version":1}""", "1,1,2")
()
},
() => "Expected to detect duplicate broker list entries").getMessage)
assertStartsWith("Broker list contains duplicate entries",
assertThrows(classOf[AdminCommandFailedException], () => parseGenerateAssignmentArgs(
"""{"topics": [{"topic": "foo"}], "version":1}""", "5,2,3,4,5"),
assertThrows(classOf[AdminCommandFailedException], () => {
parseGenerateAssignmentArgs(
"""{"topics": [{"topic": "foo"}], "version":1}""", "5,2,3,4,5")
()
},
() => "Expected to detect duplicate broker list entries").getMessage)
assertEquals((Seq(5,2,3,4),Seq("foo")),
parseGenerateAssignmentArgs("""{"topics": [{"topic": "foo"}], "version":1}""",
"5,2,3,4"))
assertStartsWith("List of topics to reassign contains duplicate entries",
assertThrows(classOf[AdminCommandFailedException], () => parseGenerateAssignmentArgs(
"""{"topics": [{"topic": "foo"},{"topic": "foo"}], "version":1}""", "5,2,3,4"),
assertThrows(classOf[AdminCommandFailedException], () => {
parseGenerateAssignmentArgs(
"""{"topics": [{"topic": "foo"},{"topic": "foo"}], "version":1}""", "5,2,3,4")
()
},
() => "Expected to detect duplicate topic entries").getMessage)
assertEquals((Seq(5,3,4),Seq("foo","bar")),
parseGenerateAssignmentArgs(
Expand All @@ -279,7 +288,10 @@ class ReassignPartitionsUnitTest {
addTopics(adminClient)
assertStartsWith("Replication factor: 3 larger than available brokers: 2",
assertThrows(classOf[InvalidReplicationFactorException],
() => generateAssignment(adminClient, """{"topics":[{"topic":"foo"},{"topic":"bar"}]}""", "0,1", false),
() => {
generateAssignment(adminClient, """{"topics":[{"topic":"foo"},{"topic":"bar"}]}""", "0,1", false)
()
},
() => "Expected generateAssignment to fail").getMessage)
} finally {
adminClient.close()
Expand All @@ -293,7 +305,10 @@ class ReassignPartitionsUnitTest {
addTopics(adminClient)
assertStartsWith("Topic quux not found",
assertThrows(classOf[ExecutionException],
() => generateAssignment(adminClient, """{"topics":[{"topic":"foo"},{"topic":"quux"}]}""", "0,1", false),
() => {
generateAssignment(adminClient, """{"topics":[{"topic":"foo"},{"topic":"quux"}]}""", "0,1", false)
()
},
() => "Expected generateAssignment to fail").getCause.getMessage)
} finally {
adminClient.close()
Expand All @@ -315,7 +330,10 @@ class ReassignPartitionsUnitTest {
addTopics(adminClient)
assertStartsWith("Not all brokers have rack information.",
assertThrows(classOf[AdminOperationException],
() => generateAssignment(adminClient, """{"topics":[{"topic":"foo"}]}""", "0,1,2,3", true),
() => {
generateAssignment(adminClient, """{"topics":[{"topic":"foo"}]}""", "0,1,2,3", true)
()
},
() => "Expected generateAssignment to fail").getMessage)
// It should succeed when --disable-rack-aware is used.
val (_, current) = generateAssignment(adminClient,
Expand Down Expand Up @@ -450,26 +468,38 @@ class ReassignPartitionsUnitTest {
def testParseExecuteAssignmentArgs(): Unit = {
assertStartsWith("Partition reassignment list cannot be empty",
assertThrows(classOf[AdminCommandFailedException],
() => parseExecuteAssignmentArgs("""{"version":1,"partitions":[]}"""),
() => {
parseExecuteAssignmentArgs("""{"version":1,"partitions":[]}""")
()
},
() => "Expected to detect empty partition reassignment list").getMessage)
assertStartsWith("Partition reassignment contains duplicate topic partitions",
assertThrows(classOf[AdminCommandFailedException], () => parseExecuteAssignmentArgs(
assertThrows(classOf[AdminCommandFailedException], () => {
parseExecuteAssignmentArgs(
"""{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1],"log_dirs":["any","any"]},""" +
"""{"topic":"foo","partition":0,"replicas":[2,3,4],"log_dirs":["any","any","any"]}""" +
"""]}"""), () => "Expected to detect a partition list with duplicate entries").getMessage)
"""]}""")
()
}, () => "Expected to detect a partition list with duplicate entries").getMessage)
assertStartsWith("Partition reassignment contains duplicate topic partitions",
assertThrows(classOf[AdminCommandFailedException], () => parseExecuteAssignmentArgs(
assertThrows(classOf[AdminCommandFailedException], () => {
parseExecuteAssignmentArgs(
"""{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1],"log_dirs":["/abc","/def"]},""" +
"""{"topic":"foo","partition":0,"replicas":[2,3],"log_dirs":["/abc","/def"]}""" +
"""]}"""), () => "Expected to detect a partition replica list with duplicate entries").getMessage)
"""]}""")
()
}, () => "Expected to detect a partition replica list with duplicate entries").getMessage)
assertStartsWith("Partition replica lists may not contain duplicate entries",
assertThrows(classOf[AdminCommandFailedException], () => parseExecuteAssignmentArgs(
assertThrows(classOf[AdminCommandFailedException], () => {
parseExecuteAssignmentArgs(
"""{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,0],"log_dirs":["/abc","/def"]},""" +
"""{"topic":"foo","partition":1,"replicas":[2,3],"log_dirs":["/abc","/def"]}""" +
"""]}"""), () => "Expected to detect a partition replica list with duplicate entries").getMessage)
"""]}""")
()
}, () => "Expected to detect a partition replica list with duplicate entries").getMessage)
assertEquals((Map(
new TopicPartition("foo", 0) -> Seq(1, 2, 3),
new TopicPartition("foo", 1) -> Seq(3, 4, 5),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3637,7 +3637,10 @@ class GroupCoordinatorTest {
}

private def verifyDelayedTaskNotCompleted(firstJoinFuture: Future[JoinGroupResult]) = {
assertThrows(classOf[TimeoutException], () => await(firstJoinFuture, 1),
assertThrows(classOf[TimeoutException], () => {
await(firstJoinFuture, 1)
()
},
() => "should have timed out as rebalance delay not expired")
}

Expand Down
Loading

0 comments on commit ddd25c6

Please sign in to comment.