Skip to content
Permalink
Browse files
[FLINK-27185][connector] Convert connector-base module to assertj
Co-authored-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
2 people authored and MartijnVisser committed May 18, 2022
1 parent 5b8dc10 commit d38b7d1a5baca26546335c5483275ad0ca1ccb87
Showing 17 changed files with 318 additions and 366 deletions.
@@ -24,7 +24,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Utils class for {@link AsyncSinkWriter} related test. */
public class AsyncSinkWriterTestUtils {
@@ -46,21 +46,21 @@ public static <T extends Serializable> BufferedRequestState<T> getTestState(
public static <T extends Serializable> void assertThatBufferStatesAreEqual(
BufferedRequestState<T> actual, BufferedRequestState<T> expected) {
// Equal states must have equal sizes
assertEquals(actual.getStateSize(), expected.getStateSize());
assertThat(actual.getStateSize()).isEqualTo(expected.getStateSize());

// Equal states must have the same number of requests.
int actualLength = actual.getBufferedRequestEntries().size();
assertEquals(actualLength, expected.getBufferedRequestEntries().size());
assertThat(actualLength).isEqualTo(expected.getBufferedRequestEntries().size());

List<RequestEntryWrapper<T>> actualRequests = actual.getBufferedRequestEntries();
List<RequestEntryWrapper<T>> expectedRequests = expected.getBufferedRequestEntries();

// Equal states must have same requests in the same order.
for (int i = 0; i < actualLength; i++) {
assertEquals(
actualRequests.get(i).getRequestEntry(),
expectedRequests.get(i).getRequestEntry());
assertEquals(actualRequests.get(i).getSize(), expectedRequests.get(i).getSize());
assertThat(actualRequests.get(i).getRequestEntry())
.isEqualTo(expectedRequests.get(i).getRequestEntry());
assertThat(actualRequests.get(i).getSize())
.isEqualTo(expectedRequests.get(i).getSize());
}
}
}
@@ -47,8 +47,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** MiniCluster-based integration test for the {@link HybridSource}. */
public class HybridSourceITCase extends TestLogger {
@@ -205,7 +204,7 @@ private static void restartTaskManager(Runnable afterFailAction, MiniCluster min

private static void verifyResult(List<Integer> result) {
Collections.sort(result);
assertThat(result, equalTo(EXPECTED_RESULT));
assertThat(result).isEqualTo(EXPECTED_RESULT);
}

// ------------------------------------------------------------------------
@@ -30,14 +30,14 @@
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.mock.Whitebox;

import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link HybridSourceReader}. */
public class HybridSourceReaderTest {

@@ -55,11 +55,11 @@ public void testReader() throws Exception {

HybridSourceReader<Integer> reader = new HybridSourceReader<>(readerContext);

Assert.assertThat(readerContext.getSentEvents(), Matchers.emptyIterable());
assertThat(readerContext.getSentEvents()).isEmpty();
reader.start();
assertAndClearSourceReaderFinishedEvent(readerContext, -1);
Assert.assertNull(currentReader(reader));
Assert.assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(readerOutput));
assertThat(currentReader(reader)).isNull();
assertThat(reader.pollNext(readerOutput)).isEqualTo(InputStatus.NOTHING_AVAILABLE);

Source source1 =
new MockSource(null, 0) {
@@ -85,19 +85,19 @@ public SourceReader<Integer, MockSourceSplit> createReader(
status = reader.pollNext(readerOutput);
Thread.sleep(10);
}
Assert.assertThat(readerOutput.getEmittedRecords(), Matchers.contains(0));
assertThat(readerOutput.getEmittedRecords()).contains(0);
reader.pollNext(readerOutput);
Assert.assertEquals(
"before notifyNoMoreSplits",
InputStatus.NOTHING_AVAILABLE,
reader.pollNext(readerOutput));
assertThat(reader.pollNext(readerOutput))
.as("before notifyNoMoreSplits")
.isEqualTo(InputStatus.NOTHING_AVAILABLE);

reader.notifyNoMoreSplits();
reader.pollNext(readerOutput);
assertAndClearSourceReaderFinishedEvent(readerContext, 0);

Assert.assertEquals(
"reader before switch source event", mockSplitReader1, currentReader(reader));
assertThat(currentReader(reader))
.as("reader before switch source event")
.isEqualTo(mockSplitReader1);

Source source2 =
new MockSource(null, 0) {
@@ -108,14 +108,14 @@ public SourceReader<Integer, MockSourceSplit> createReader(
}
};
reader.handleSourceEvents(new SwitchSourceEvent(1, source2, true));
Assert.assertEquals(
"reader after switch source event", mockSplitReader2, currentReader(reader));
assertThat(currentReader(reader))
.as("reader after switch source event")
.isEqualTo(mockSplitReader2);

reader.notifyNoMoreSplits();
Assert.assertEquals(
"reader 1 after notifyNoMoreSplits",
InputStatus.END_OF_INPUT,
reader.pollNext(readerOutput));
assertThat(reader.pollNext(readerOutput))
.as("reader 1 after notifyNoMoreSplits")
.isEqualTo(InputStatus.END_OF_INPUT);

reader.close();
}
@@ -140,22 +140,22 @@ public void testReaderRecovery() throws Exception {
reader.addSplits(Collections.singletonList(hybridSplit));

List<HybridSourceSplit> snapshot = reader.snapshotState(0);
Assert.assertThat(snapshot, Matchers.contains(hybridSplit));
assertThat(snapshot).contains(hybridSplit);

// reader recovery
readerContext.clearSentEvents();
reader = new HybridSourceReader<>(readerContext);

reader.addSplits(snapshot);
Assert.assertNull(currentReader(reader));
assertThat(currentReader(reader)).isNull();

reader.start();
Assert.assertNull(currentReader(reader));
assertThat(currentReader(reader)).isNull();

assertAndClearSourceReaderFinishedEvent(readerContext, -1);
reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
Assert.assertNotNull(currentReader(reader));
Assert.assertThat(reader.snapshotState(1), Matchers.contains(hybridSplit));
assertThat(currentReader(reader)).isNotNull();
assertThat(reader.snapshotState(1)).contains(hybridSplit);

reader.close();
}
@@ -196,10 +196,9 @@ private static SourceReader<Integer, MockSourceSplit> currentReader(

private static void assertAndClearSourceReaderFinishedEvent(
TestingReaderContext context, int sourceIndex) {
Assert.assertThat(context.getSentEvents(), Matchers.iterableWithSize(1));
Assert.assertEquals(
sourceIndex,
((SourceReaderFinishedEvent) context.getSentEvents().get(0)).sourceIndex());
assertThat(context.getSentEvents()).hasSize(1);
assertThat(((SourceReaderFinishedEvent) context.getSentEvents().get(0)).sourceIndex())
.isEqualTo(sourceIndex);
context.clearSentEvents();
}
}
@@ -29,8 +29,6 @@
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.mock.Whitebox;

import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

@@ -39,8 +37,8 @@
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link HybridSourceSplitEnumerator}. */
public class HybridSourceSplitEnumeratorTest {
@@ -63,32 +61,31 @@ private void setupEnumeratorAndTriggerSourceSwitch() {
enumerator.start();
// mock enumerator assigns splits once all readers are registered
registerReader(context, enumerator, SUBTASK0);
assertThat(context.getSplitsAssignmentSequence(), Matchers.emptyIterable());
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
registerReader(context, enumerator, SUBTASK1);
assertThat(context.getSplitsAssignmentSequence(), Matchers.emptyIterable());
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(0));
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(1));
assertThat(context.getSplitsAssignmentSequence()).hasSize(1);
splitFromSource0 =
context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0);
assertEquals(0, splitFromSource0.sourceIndex());
assertEquals(0, getCurrentSourceIndex(enumerator));
assertThat(splitFromSource0.sourceIndex()).isEqualTo(0);
assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);

// trigger source switch
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0));
assertEquals("one reader finished", 0, getCurrentSourceIndex(enumerator));
assertThat(getCurrentSourceIndex(enumerator)).as("one reader finished").isEqualTo(0);
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(0));
assertEquals("both readers finished", 1, getCurrentSourceIndex(enumerator));
assertThat(
"switch triggers split assignment",
context.getSplitsAssignmentSequence(),
Matchers.iterableWithSize(2));
assertThat(getCurrentSourceIndex(enumerator)).as("both readers finished").isEqualTo(1);
assertThat(context.getSplitsAssignmentSequence())
.as("switch triggers split assignment")
.hasSize(2);
splitFromSource1 =
context.getSplitsAssignmentSequence().get(1).assignment().get(SUBTASK0).get(0);
assertEquals(1, splitFromSource1.sourceIndex());
assertThat(splitFromSource1.sourceIndex()).isEqualTo(1);
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1));
assertEquals("reader without assignment", 1, getCurrentSourceIndex(enumerator));
assertThat(getCurrentSourceIndex(enumerator)).as("reader without assignment").isEqualTo(1);
}

@Test
@@ -99,7 +96,7 @@ public void testRegisterReaderAfterSwitchAndReaderReset() {
context.getSplitsAssignmentSequence().clear();
enumerator.addReader(SUBTASK0);
enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0);
assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(0));
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
assertSplitAssignment(
"addSplitsBack triggers assignment when reader registered",
@@ -112,12 +109,11 @@ public void testRegisterReaderAfterSwitchAndReaderReset() {
context.getSplitsAssignmentSequence().clear();
context.unregisterReader(SUBTASK0);
enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0);
assertThat(
"addSplitsBack doesn't trigger assignment when reader not registered",
context.getSplitsAssignmentSequence(),
Matchers.emptyIterable());
assertThat(context.getSplitsAssignmentSequence())
.as("addSplitsBack doesn't trigger assignment when reader not registered")
.isEmpty();
registerReader(context, enumerator, SUBTASK0);
assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(0));
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
assertSplitAssignment(
"registerReader triggers assignment", context, 1, splitFromSource0, SUBTASK0);
@@ -134,13 +130,13 @@ public void testHandleSplitRequestAfterSwitchAndReaderReset() {
List<MockSourceSplit> mockSourceSplits =
(List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumeratorWrapper.enumerator, "splits");
assertThat(mockSourceSplits, Matchers.emptyIterable());
assertThat(mockSourceSplits).isEmpty();

// simulate reader reset to before switch by adding split of previous source back
context.getSplitsAssignmentSequence().clear();
assertEquals("current enumerator", 1, getCurrentSourceIndex(enumerator));
assertThat(getCurrentSourceIndex(enumerator)).as("current enumerator").isEqualTo(1);

assertThat(underlyingEnumeratorWrapper.handleSplitRequests, Matchers.emptyIterable());
assertThat(underlyingEnumeratorWrapper.handleSplitRequests).isEmpty();
enumerator.handleSplitRequest(SUBTASK0, "fakehostname");

SwitchedSources switchedSources = new SwitchedSources();
@@ -156,11 +152,8 @@ public void testHandleSplitRequestAfterSwitchAndReaderReset() {

// handleSplitRequest invalid during reset
enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0);
try {
enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
Assert.fail("expected exception");
} catch (IllegalStateException ex) {
}
assertThatThrownBy(() -> enumerator.handleSplitRequest(SUBTASK0, "fakehostname"))
.isInstanceOf(IllegalStateException.class);
}

@Test
@@ -170,16 +163,18 @@ public void testRestoreEnumerator() throws Exception {
enumerator.start();
HybridSourceEnumeratorState enumeratorState = enumerator.snapshotState(0);
MockSplitEnumerator underlyingEnumerator = getCurrentEnumerator(enumerator);
Assert.assertThat(
(List<MockSourceSplit>) Whitebox.getInternalState(underlyingEnumerator, "splits"),
Matchers.iterableWithSize(1));
assertThat(
(List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumerator, "splits"))
.hasSize(1);
enumerator =
(HybridSourceSplitEnumerator) source.restoreEnumerator(context, enumeratorState);
enumerator.start();
underlyingEnumerator = getCurrentEnumerator(enumerator);
Assert.assertThat(
(List<MockSourceSplit>) Whitebox.getInternalState(underlyingEnumerator, "splits"),
Matchers.iterableWithSize(1));
assertThat(
(List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumerator, "splits"))
.hasSize(1);
}

@Test
@@ -251,15 +246,15 @@ private static void assertSplitAssignment(
int size,
HybridSourceSplit split,
int subtask) {
assertThat(reason, context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(size));
assertEquals(
reason,
split,
context.getSplitsAssignmentSequence()
.get(size - 1)
.assignment()
.get(subtask)
.get(0));
assertThat(context.getSplitsAssignmentSequence()).as(reason).hasSize(size);
assertThat(
context.getSplitsAssignmentSequence()
.get(size - 1)
.assignment()
.get(subtask)
.get(0))
.as(reason)
.isEqualTo(split);
}

private static void registerReader(
@@ -21,13 +21,15 @@
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link HybridSourceSplitSerializer}. */
public class HybridSourceSplitSerializerTest {

@@ -40,13 +42,9 @@ public void testSerialization() throws Exception {
HybridSourceSplit split = new HybridSourceSplit(0, splitBytes, 0, "splitId");
byte[] serialized = serializer.serialize(split);
HybridSourceSplit clonedSplit = serializer.deserialize(0, serialized);
Assert.assertEquals(split, clonedSplit);

try {
serializer.deserialize(1, serialized);
Assert.fail();
} catch (IOException e) {
// expected invalid version
}
assertThat(clonedSplit).isEqualTo(split);

assertThatThrownBy(() -> serializer.deserialize(1, serialized))
.isInstanceOf(IOException.class);
}
}

0 comments on commit d38b7d1

Please sign in to comment.