Skip to content

Commit

Permalink
MINOR: Fix remaining core, connect and clients tests to pass with Jav…
Browse files Browse the repository at this point in the history
…a 11 (#5771)

- SslFactoryTest should use SslFactory to create SSLEngine
- Use Mockito instead of EasyMock in `ConsoleConsumerTest` as one of
the tests mocks a standard library class and the latest released EasyMock
version can't do that when Java 11 is used.
- Avoid mocking `ConcurrentMap` in `SourceTaskOffsetCommitterTest`
for similar reasons. As it happens, mocking is not actually needed here.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
  • Loading branch information
ijuma committed Oct 10, 2018
1 parent 34f029e commit adb3a95
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 54 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ project(':core') {

testCompile project(':clients').sourceSets.test.output
testCompile libs.bcpkix
testCompile libs.mockitoCore
testCompile libs.easymock
testCompile(libs.apacheda) {
exclude group: 'xml-apis', module: 'xml-apis'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testReconfiguration() throws Exception {
SSLContext sslContext = sslFactory.sslContext();
assertNotNull("SSL context not created", sslContext);
assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
assertFalse(sslContext.createSSLEngine("localhost", 0).getUseClientMode());
assertFalse(sslFactory.createSslEngine("localhost", 0).getUseClientMode());

// Verify that context is not recreated on reconfigure() if config and file are not changed
sslFactory.reconfigure(sslConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -40,20 +38,20 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonMap;
import static org.easymock.EasyMock.eq;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(PowerMockRunner.class)
@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class})
public class SourceTaskOffsetCommitterTest extends ThreadedTest {
@Mock
private ScheduledExecutorService executor;
@Mock
private ConcurrentHashMap committers;
@Mock
private Logger mockLog;

private final ConcurrentHashMap committers = new ConcurrentHashMap<>();

@Mock private ScheduledExecutorService executor;
@Mock private Logger mockLog;
@Mock private ScheduledFuture commitFuture;
@Mock private ScheduledFuture taskFuture;
@Mock private ConnectorTaskId taskId;
Expand Down Expand Up @@ -82,21 +80,20 @@ public void setup() {
}

@Test
public void testSchedule() throws Exception {
public void testSchedule() {
Capture<Runnable> taskWrapper = EasyMock.newCapture();

EasyMock.expect(executor.scheduleWithFixedDelay(
EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
).andReturn(commitFuture);

EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);

PowerMock.replayAll();

committer.schedule(taskId, task);
assertTrue(taskWrapper.hasCaptured());
assertNotNull(taskWrapper.getValue());
assertEquals(singletonMap(taskId, commitFuture), committers);

PowerMock.verifyAll();
}
Expand Down Expand Up @@ -136,48 +133,51 @@ public void testClose() throws Exception {
@Test
public void testRemove() throws Exception {
// Try to remove a non-existing task
EasyMock.expect(committers.remove(taskId)).andReturn(null);
PowerMock.replayAll();

assertTrue(committers.isEmpty());
committer.remove(taskId);
assertTrue(committers.isEmpty());

PowerMock.verifyAll();
PowerMock.resetAll();

// Try to remove an existing task
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andReturn(null);
PowerMock.replayAll();

committers.put(taskId, taskFuture);
committer.remove(taskId);
assertTrue(committers.isEmpty());

PowerMock.verifyAll();
PowerMock.resetAll();

// Try to remove a cancelled task
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());
PowerMock.expectLastCall();
PowerMock.replayAll();

committers.put(taskId, taskFuture);
committer.remove(taskId);
assertTrue(committers.isEmpty());

PowerMock.verifyAll();
PowerMock.resetAll();

// Try to remove an interrupted task
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andThrow(new InterruptedException());
PowerMock.replayAll();

try {
committers.put(taskId, taskFuture);
committer.remove(taskId);
fail("Expected ConnectException to be raised");
} catch (ConnectException e) {
Expand Down
60 changes: 23 additions & 37 deletions core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetRe
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.test.MockDeserializer
import org.easymock.EasyMock
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers
import ArgumentMatchers._
import org.junit.Assert._
import org.junit.{Before, Test}

Expand Down Expand Up @@ -61,69 +63,53 @@ class ConsoleConsumerTest {
mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes))
}

// Mocks
val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
val formatter = mock(classOf[MessageFormatter])

// Expectations
EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(maxMessages)
EasyMock.replay(formatter)

// Test
ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError = false)
assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))

consumer.resetUnconsumedOffsets()
assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))

EasyMock.verify(formatter)
verify(formatter, times(maxMessages)).writeTo(any(), any())
}

@Test
def shouldLimitReadsToMaxMessageLimit() {
//Mocks
val consumer = EasyMock.createNiceMock(classOf[ConsumerWrapper])
val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])

//Stubs
val consumer = mock(classOf[ConsumerWrapper])
val formatter = mock(classOf[MessageFormatter])
val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())

//Expectations
val messageLimit: Int = 10
EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit)
when(consumer.receive()).thenReturn(record)

EasyMock.replay(consumer)
EasyMock.replay(formatter)

//Test
ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true)

verify(consumer, times(messageLimit)).receive()
verify(formatter, times(messageLimit)).writeTo(any(), any())

consumer.cleanup()
}

@Test
def shouldStopWhenOutputCheckErrorFails() {
//Mocks
val consumer = EasyMock.createNiceMock(classOf[ConsumerWrapper])
val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
val printStream = EasyMock.createNiceMock(classOf[PrintStream])
val consumer = mock(classOf[ConsumerWrapper])
val formatter = mock(classOf[MessageFormatter])
val printStream = mock(classOf[PrintStream])

//Stubs
val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())

//Expectations
EasyMock.expect(consumer.receive()).andReturn(record)
EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.eq(printStream)))
when(consumer.receive()).thenReturn(record)
//Simulate an error on System.out after the first record has been printed
EasyMock.expect(printStream.checkError()).andReturn(true)

EasyMock.replay(consumer)
EasyMock.replay(formatter)
EasyMock.replay(printStream)
when(printStream.checkError()).thenReturn(true)

//Test
ConsoleConsumer.process(-1, formatter, consumer, printStream, true)

//Verify
EasyMock.verify(consumer, formatter, printStream)
verify(formatter).writeTo(any(), ArgumentMatchers.eq(printStream))
verify(consumer).receive()
verify(printStream).checkError()

consumer.cleanup()
}

@Test
Expand Down

0 comments on commit adb3a95

Please sign in to comment.