Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito #13712

Merged
merged 2 commits into from
Jun 13, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
Expand Down Expand Up @@ -108,7 +109,6 @@
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.resetToStrict;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -195,7 +195,7 @@ public class TaskManagerTest {
private ActiveTaskCreator activeTaskCreator;
@Mock(type = MockType.NICE)
private StandbyTaskCreator standbyTaskCreator;
@Mock(type = MockType.NICE)
@org.mockito.Mock
private Admin adminClient;
final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);

Expand Down Expand Up @@ -3694,12 +3694,12 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {

@Test
public void shouldSendPurgeData() {
resetToStrict(adminClient);
expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.andReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
.andReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
replay(adminClient);
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
Comment on lines +3697 to +3700
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to verify the order of the calls since the admin mock is reset to strict in EasyMock. The order is indeed important here because you do not want to purge records in the wrong order from a repartition topic.


InOrder inOrder = Mockito.inOrder(adminClient);
cadonna marked this conversation as resolved.
Show resolved Hide resolved

final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
Expand All @@ -3726,16 +3726,16 @@ public Map<TopicPartition, Long> purgeableOffsets() {
purgableOffsets.put(t1p1, 17L);
taskManager.maybePurgeCommittedRecords();

verify(adminClient);
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L)));
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L)));
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldNotSendPurgeDataIfPreviousNotDone() {
resetToStrict(adminClient);
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.andReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)));
replay(adminClient);
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)));

final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
Expand Down Expand Up @@ -3763,8 +3763,6 @@ public Map<TopicPartition, Long> purgeableOffsets() {
// so it would fail verification if we invoke the admin client again.
purgableOffsets.put(t1p1, 17L);
taskManager.maybePurgeCommittedRecords();

verify(adminClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Test
Expand All @@ -3776,9 +3774,9 @@ public void shouldIgnorePurgeDataErrors() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords));
futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
expect(adminClient.deleteRecords(anyObject())).andReturn(deleteRecordsResult).times(2);
when(adminClient.deleteRecords(any())).thenReturn(deleteRecordsResult);

replay(adminClient, consumer);
replay(consumer);

taskManager.addTask(task00);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
Expand All @@ -3789,8 +3787,6 @@ public void shouldIgnorePurgeDataErrors() {

taskManager.maybePurgeCommittedRecords();
taskManager.maybePurgeCommittedRecords();

verify(adminClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Test
Expand Down