From adb3a950ee2d5843673cdf14d5471ded5c0419b2 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 10 Oct 2018 13:31:06 -0700 Subject: [PATCH] MINOR: Fix remaining core, connect and clients tests to pass with Java 11 (#5771) - SslFactoryTest should use SslFactory to create SSLEngine - Use Mockito instead of EasyMock in `ConsoleConsumerTest` as one of the tests mocks a standard library class and the latest released EasyMock version can't do that when Java 11 is used. - Avoid mocking `ConcurrentMap` in `SourceTaskOffsetCommitterTest` for similar reasons. As it happens, mocking is not actually needed here. Reviewers: Rajini Sivaram --- build.gradle | 1 + .../common/security/ssl/SslFactoryTest.java | 2 +- .../SourceTaskOffsetCommitterTest.java | 32 +++++----- .../kafka/tools/ConsoleConsumerTest.scala | 60 +++++++------------ 4 files changed, 41 insertions(+), 54 deletions(-) diff --git a/build.gradle b/build.gradle index ce1314ccce3f..95f3eb3eb007 100644 --- a/build.gradle +++ b/build.gradle @@ -581,6 +581,7 @@ project(':core') { testCompile project(':clients').sourceSets.test.output testCompile libs.bcpkix + testCompile libs.mockitoCore testCompile libs.easymock testCompile(libs.apacheda) { exclude group: 'xml-apis', module: 'xml-apis' diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index 97021e3908ac..bfe34c98382b 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -94,7 +94,7 @@ public void testReconfiguration() throws Exception { SSLContext sslContext = sslFactory.sslContext(); assertNotNull("SSL context not created", sslContext); assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext()); - assertFalse(sslContext.createSSLEngine("localhost", 0).getUseClientMode()); + assertFalse(sslFactory.createSslEngine("localhost", 0).getUseClientMode()); // Verify that context is not recreated on reconfigure() if config and file are not changed sslFactory.reconfigure(sslConfig); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java index 2d5da983d294..c7cb08bfb522 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -26,11 +26,9 @@ import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -40,20 +38,20 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static java.util.Collections.singletonMap; import static org.easymock.EasyMock.eq; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) -@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class}) public class SourceTaskOffsetCommitterTest extends ThreadedTest { - @Mock - private ScheduledExecutorService executor; - @Mock - private ConcurrentHashMap committers; - @Mock - private Logger mockLog; + + private final ConcurrentHashMap committers = new ConcurrentHashMap<>(); + + @Mock private ScheduledExecutorService executor; + @Mock private Logger mockLog; @Mock private ScheduledFuture commitFuture; @Mock private ScheduledFuture taskFuture; @Mock private ConnectorTaskId taskId; @@ -82,7 +80,7 @@ public void setup() { } @Test - public void testSchedule() throws Exception { + public void testSchedule() { Capture taskWrapper = EasyMock.newCapture(); EasyMock.expect(executor.scheduleWithFixedDelay( @@ -90,13 +88,12 @@ public void testSchedule() throws Exception { eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS)) ).andReturn(commitFuture); - EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null); - PowerMock.replayAll(); committer.schedule(taskId, task); assertTrue(taskWrapper.hasCaptured()); assertNotNull(taskWrapper.getValue()); + assertEquals(singletonMap(taskId, commitFuture), committers); PowerMock.verifyAll(); } @@ -136,28 +133,29 @@ public void testClose() throws Exception { @Test public void testRemove() throws Exception { // Try to remove a non-existing task - EasyMock.expect(committers.remove(taskId)).andReturn(null); PowerMock.replayAll(); + assertTrue(committers.isEmpty()); committer.remove(taskId); + assertTrue(committers.isEmpty()); PowerMock.verifyAll(); PowerMock.resetAll(); // Try to remove an existing task - EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture); EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false); EasyMock.expect(taskFuture.isDone()).andReturn(false); EasyMock.expect(taskFuture.get()).andReturn(null); PowerMock.replayAll(); + committers.put(taskId, taskFuture); committer.remove(taskId); + assertTrue(committers.isEmpty()); PowerMock.verifyAll(); PowerMock.resetAll(); // Try to remove a cancelled task - EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture); EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false); EasyMock.expect(taskFuture.isDone()).andReturn(false); EasyMock.expect(taskFuture.get()).andThrow(new CancellationException()); @@ -165,19 +163,21 @@ public void testRemove() throws Exception { PowerMock.expectLastCall(); PowerMock.replayAll(); + committers.put(taskId, taskFuture); committer.remove(taskId); + assertTrue(committers.isEmpty()); PowerMock.verifyAll(); PowerMock.resetAll(); // Try to remove an interrupted task - EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture); EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false); EasyMock.expect(taskFuture.isDone()).andReturn(false); EasyMock.expect(taskFuture.get()).andThrow(new InterruptedException()); PowerMock.replayAll(); try { + committers.put(taskId, taskFuture); committer.remove(taskId); fail("Expected ConnectException to be raised"); } catch (ConnectException e) { diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 4f09dd2a0327..47b7fae3d9b8 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -27,7 +27,9 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetRe import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.test.MockDeserializer -import org.easymock.EasyMock +import org.mockito.Mockito._ +import org.mockito.ArgumentMatchers +import ArgumentMatchers._ import org.junit.Assert._ import org.junit.{Before, Test} @@ -61,69 +63,53 @@ class ConsoleConsumerTest { mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes)) } - // Mocks - val formatter = EasyMock.createNiceMock(classOf[MessageFormatter]) + val formatter = mock(classOf[MessageFormatter]) - // Expectations - EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(maxMessages) - EasyMock.replay(formatter) - - // Test ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError = false) assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2)) consumer.resetUnconsumedOffsets() assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2)) - EasyMock.verify(formatter) + verify(formatter, times(maxMessages)).writeTo(any(), any()) } @Test def shouldLimitReadsToMaxMessageLimit() { - //Mocks - val consumer = EasyMock.createNiceMock(classOf[ConsumerWrapper]) - val formatter = EasyMock.createNiceMock(classOf[MessageFormatter]) - - //Stubs + val consumer = mock(classOf[ConsumerWrapper]) + val formatter = mock(classOf[MessageFormatter]) val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]()) - //Expectations val messageLimit: Int = 10 - EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit) - EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit) + when(consumer.receive()).thenReturn(record) - EasyMock.replay(consumer) - EasyMock.replay(formatter) - - //Test ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true) + + verify(consumer, times(messageLimit)).receive() + verify(formatter, times(messageLimit)).writeTo(any(), any()) + + consumer.cleanup() } @Test def shouldStopWhenOutputCheckErrorFails() { - //Mocks - val consumer = EasyMock.createNiceMock(classOf[ConsumerWrapper]) - val formatter = EasyMock.createNiceMock(classOf[MessageFormatter]) - val printStream = EasyMock.createNiceMock(classOf[PrintStream]) + val consumer = mock(classOf[ConsumerWrapper]) + val formatter = mock(classOf[MessageFormatter]) + val printStream = mock(classOf[PrintStream]) - //Stubs val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]()) - //Expectations - EasyMock.expect(consumer.receive()).andReturn(record) - EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.eq(printStream))) + when(consumer.receive()).thenReturn(record) //Simulate an error on System.out after the first record has been printed - EasyMock.expect(printStream.checkError()).andReturn(true) - - EasyMock.replay(consumer) - EasyMock.replay(formatter) - EasyMock.replay(printStream) + when(printStream.checkError()).thenReturn(true) - //Test ConsoleConsumer.process(-1, formatter, consumer, printStream, true) - //Verify - EasyMock.verify(consumer, formatter, printStream) + verify(formatter).writeTo(any(), ArgumentMatchers.eq(printStream)) + verify(consumer).receive() + verify(printStream).checkError() + + consumer.cleanup() } @Test