Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix remaining core, connect and clients tests to pass with Java 11 #5771

Merged
merged 4 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ project(':core') {

testCompile project(':clients').sourceSets.test.output
testCompile libs.bcpkix
testCompile libs.mockitoCore
testCompile libs.easymock
testCompile(libs.apacheda) {
exclude group: 'xml-apis', module: 'xml-apis'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testReconfiguration() throws Exception {
SSLContext sslContext = sslFactory.sslContext();
assertNotNull("SSL context not created", sslContext);
assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
assertFalse(sslContext.createSSLEngine("localhost", 0).getUseClientMode());
assertFalse(sslFactory.createSslEngine("localhost", 0).getUseClientMode());

// Verify that context is not recreated on reconfigure() if config and file are not changed
sslFactory.reconfigure(sslConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
Expand All @@ -40,20 +41,21 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.easymock.EasyMock.eq;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(PowerMockRunner.class)
@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class})
@PrepareForTest({SourceTaskOffsetCommitter.class})
public class SourceTaskOffsetCommitterTest extends ThreadedTest {
@Mock
private ScheduledExecutorService executor;
@Mock
private ConcurrentHashMap committers;
@Mock
private Logger mockLog;

private final ConcurrentHashMap committers = new ConcurrentHashMap<>();

@Mock private ScheduledExecutorService executor;
@Mock private Logger mockLog;
@Mock private ScheduledFuture commitFuture;
@Mock private ScheduledFuture taskFuture;
@Mock private ConnectorTaskId taskId;
Expand Down Expand Up @@ -82,21 +84,20 @@ public void setup() {
}

@Test
public void testSchedule() throws Exception {
public void testSchedule() {
Capture<Runnable> taskWrapper = EasyMock.newCapture();

EasyMock.expect(executor.scheduleWithFixedDelay(
EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
).andReturn(commitFuture);

EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);

PowerMock.replayAll();

committer.schedule(taskId, task);
assertTrue(taskWrapper.hasCaptured());
assertNotNull(taskWrapper.getValue());
assertEquals(Utils.mkMap(mkEntry(taskId, commitFuture)), committers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mkMap and mkEntry are both statics in Utils, why do we have static import for one and not the other? Anyway, perhaps this could just be Collections.singletonMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, changed to use singletonMap. Also removed some imports and an unnecessary @PrepareForTest.


PowerMock.verifyAll();
}
Expand Down Expand Up @@ -136,48 +137,51 @@ public void testClose() throws Exception {
@Test
public void testRemove() throws Exception {
// Try to remove a non-existing task
EasyMock.expect(committers.remove(taskId)).andReturn(null);
PowerMock.replayAll();

assertTrue(committers.isEmpty());
committer.remove(taskId);
assertTrue(committers.isEmpty());

PowerMock.verifyAll();
PowerMock.resetAll();

// Try to remove an existing task
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andReturn(null);
PowerMock.replayAll();

committers.put(taskId, taskFuture);
committer.remove(taskId);
assertTrue(committers.isEmpty());

PowerMock.verifyAll();
PowerMock.resetAll();

// Try to remove a cancelled task
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());
PowerMock.expectLastCall();
PowerMock.replayAll();

committers.put(taskId, taskFuture);
committer.remove(taskId);
assertTrue(committers.isEmpty());

PowerMock.verifyAll();
PowerMock.resetAll();

// Try to remove an interrupted task
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andThrow(new InterruptedException());
PowerMock.replayAll();

try {
committers.put(taskId, taskFuture);
committer.remove(taskId);
fail("Expected ConnectException to be raised");
} catch (ConnectException e) {
Expand Down
54 changes: 19 additions & 35 deletions core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetRe
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.test.MockDeserializer
import org.easymock.EasyMock
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers
import ArgumentMatchers._
import org.junit.Assert._
import org.junit.{Before, Test}

Expand Down Expand Up @@ -61,69 +63,51 @@ class ConsoleConsumerTest {
mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes))
}

// Mocks
val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
val formatter = mock(classOf[MessageFormatter])

// Expectations
EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(maxMessages)
EasyMock.replay(formatter)

// Test
ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError = false)
assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))

consumer.resetUnconsumedOffsets()
assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))

EasyMock.verify(formatter)
verify(formatter, times(maxMessages)).writeTo(any(), any())
}

@Test
def shouldLimitReadsToMaxMessageLimit() {
//Mocks
val consumer = EasyMock.createNiceMock(classOf[ConsumerWrapper])
val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])

//Stubs
val consumer = mock(classOf[ConsumerWrapper])
val formatter = mock(classOf[MessageFormatter])
val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())

//Expectations
val messageLimit: Int = 10
EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit)

EasyMock.replay(consumer)
EasyMock.replay(formatter)
when(consumer.receive()).thenReturn(record)

//Test
ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true)

verify(consumer, times(messageLimit)).receive()
verify(formatter, times(messageLimit)).writeTo(any(), any())
}

@Test
def shouldStopWhenOutputCheckErrorFails() {
//Mocks
val consumer = EasyMock.createNiceMock(classOf[ConsumerWrapper])
val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
val printStream = EasyMock.createNiceMock(classOf[PrintStream])
val consumer = mock(classOf[ConsumerWrapper])
val formatter = mock(classOf[MessageFormatter])
val printStream = mock(classOf[PrintStream])

//Stubs
val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())

//Expectations
EasyMock.expect(consumer.receive()).andReturn(record)
EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.eq(printStream)))
when(consumer.receive()).thenReturn(record)
//Simulate an error on System.out after the first record has been printed
EasyMock.expect(printStream.checkError()).andReturn(true)

EasyMock.replay(consumer)
EasyMock.replay(formatter)
EasyMock.replay(printStream)
when(printStream.checkError()).thenReturn(true)

//Test
ConsoleConsumer.process(-1, formatter, consumer, printStream, true)

//Verify
EasyMock.verify(consumer, formatter, printStream)
verify(formatter).writeTo(any(), ArgumentMatchers.eq(printStream))
verify(consumer).receive()
verify(printStream).checkError()
}

@Test
Expand Down