New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest #14153
KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest #14153
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @bachmanity1
We are tracking migration of this file at https://issues.apache.org/jira/browse/KAFKA-14132 and currently @mdedetrich has volunteered to work on KafkaBasedLogTest. Can you please add a comment on the JIRA and ask @mdedetrich if they have an open PR yet? If they don't, feel assign this test to yourself so that we don't end up duplicating effort across the community.
Thanks!
Hi @divijvaidya Thanks for letting me know about ticket KAFKA-14132. It turns out that @mdedetrich has already begun working on this issue and has submitted a PR, but he said he doesn't have enough time to complete it and agreed to close it. Thus, I think we can handle this issue in this PR. When you have time could you please review this PR? |
I don't know why checks result in |
I am sorry, I won't have time to review this any time soon. Will circle back in a couple of weeks perhaps. I am tagging a few folks who might be interested in reviewing this: @clolov @yashmayya @shekhar-rajak |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @bachmanity1! I've left a few comments.
@Before | ||
public void setUp() { | ||
store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, | ||
TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); | ||
store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the only reason we need a partial mock here is to stub in the mock producer / consumer. Since the KafkaBasedLog::createProducer
and KafkaBasedLog::createConsumer
methods are now protected
(looks like they were private
when this test was originally written), could we just override those methods to return the mock clients and use a regular instance instead of using a spy / partial mock? I'd prefer it if we could avoid the use of spies / partial mocks as far as possible, since they aren't very OOP-y.
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also remove this test from the list here so that it isn't skipped on Java 16 and newer -
Lines 413 to 423 in 8dec3e6
// Exclude PowerMock tests when running with Java 16 or newer until a version of PowerMock that supports the relevant versions is released | |
// The relevant issues are https://github.com/powermock/powermock/issues/1094 and https://github.com/powermock/powermock/issues/1099 | |
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) { | |
testsToExclude.addAll([ | |
// connect tests | |
"**/DistributedHerderTest.*", | |
"**/KafkaConfigBackingStoreTest.*", | |
"**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*", | |
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*" | |
]) | |
} |
private static ByteBuffer buffer(String v) { | ||
return ByteBuffer.wrap(v.getBytes()); | ||
private void verifyStartAndStop() { | ||
verify(initializer).accept(any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're moving to the newer non-deprecated KafkaBasedLog
constructor in this test class, could we enhance this verification to also verify that the initializer is called with the admin client from the supplier -
kafka/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
Lines 232 to 240 in 8dec3e6
admin = topicAdminSupplier.get(); // may be null | |
if (admin == null && requireAdminForOffsets) { | |
throw new ConnectException( | |
"Must provide a TopicAdmin to KafkaBasedLog when consumer is configured with " | |
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to " | |
+ IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) | |
); | |
} | |
initializer.accept(admin); |
@@ -160,18 +156,12 @@ public void testStartStop() throws Exception { | |||
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); | |||
|
|||
store.stop(); | |||
|
|||
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we losing this test coverage? Can we bump up the visibility of the thread
instance variable to package-private
(with a comment stating that it is being made visible for testing) and retain this coverage?
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); | ||
EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); | ||
|
||
// Producer flushes when read to log end is called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we retain this clarifying comment (we can add it above the producer flush verification)?
EasyMock.expectLastCall().times(1); | ||
|
||
expectProducerAndConsumerCreate(); | ||
store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this separate setupWithAdmin
method now that we're setting up the KafkaBasedLog
store with a topic admin supplier even in the regular setUp
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think we can merge setupWithAdmin
into setUp
.
when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); | ||
when(admin.endOffsets(eq(tps))).thenReturn(endOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're losing out on the existing coverage here since we're no longer ensuring that these methods are called one time only. Can we add an explicit verification for this at the end of the test?
I checked the build and the failures are from unrelated tests that are known to be flaky so you can just ignore them. |
Hi @yashmayya, thank you for your review! I've addressed the questions raised by you. Can you have one more look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bachmanity1, this is looking really good. I just had a few more comments.
@@ -103,7 +103,7 @@ public class KafkaBasedLog<K, V> { | |||
private Optional<Producer<K, V>> producer; | |||
private TopicAdmin admin; | |||
|
|||
private Thread thread; | |||
Thread thread; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread thread; | |
// Visible for testing | |
Thread thread; |
nit
private class MockedKafkaBasedLog extends KafkaBasedLog<String, String> { | ||
public MockedKafkaBasedLog(String topic, | ||
Map<String, Object> producerConfigs, | ||
Map<String, Object> consumerConfigs, | ||
Supplier<TopicAdmin> topicAdminSupplier, | ||
Callback<ConsumerRecord<String, String>> consumedCallback, | ||
Time time, | ||
Consumer<TopicAdmin> initializer) { | ||
super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer); | ||
} | ||
|
||
@Override | ||
protected KafkaProducer<String, String> createProducer() { | ||
return producer; | ||
} | ||
|
||
@Override | ||
protected MockConsumer<String, String> createConsumer() { | ||
return consumer; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use an anonymous inner class in the setUp
method instead? I think that'll look a lot cleaner.
@Mock | ||
private KafkaProducer<String, String> producer; | ||
private MockConsumer<String, String> consumer; | ||
@Mock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, why was this change required - 2c7d2e3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
admin
field needs to be mocked only in methods where previously setupWithAdmin
was used, if we mock it everywhere then some tests don't pass because they expect that admin
is null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, that makes sense, the other methods are expecting the consumer to be used to retrieve the end offsets instead of the admin.
} | ||
|
||
@Test | ||
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { | ||
expectStart(); | ||
|
||
// Producer flushes when read to log end is called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above (https://github.com/apache/kafka/pull/14153/files#r1286903819)
@yashmayya I've applied your suggestions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bachmanity1, LGTM!
Thank you for reviewing @yashmayya. The CI is still flaky with large number of failures and I am restarting it. |
It looks like test failures are not related to this PR.
|
…ogTest (apache#14153) Reviewers: Yash Mayya <yash.mayya@gmail.com>, Divij Vaidya <diviv@amazon.com>
…ogTest (apache#14153) Reviewers: Yash Mayya <yash.mayya@gmail.com>, Divij Vaidya <diviv@amazon.com>
…ogTest (apache#14153) Reviewers: Yash Mayya <yash.mayya@gmail.com>, Divij Vaidya <diviv@amazon.com>
Replaced Easymock & Powermock with Mockito in KafkaBasedLogTest.
Committer Checklist (excluded from commit message)