Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27185][connectors] Convert connector modules to assertj #19660

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -57,10 +57,7 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for checking whether {@link FlinkKinesisConsumer} can restore from snapshots that were done
Expand Down Expand Up @@ -180,21 +177,21 @@ public void testRestoreWithEmptyState() throws Exception {
consumerFunction.run(new TestSourceContext<>());

// assert that no state was restored
assertTrue(consumerFunction.getRestoredState().isEmpty());
assertThat(consumerFunction.getRestoredState()).isEmpty();

// although the restore state is empty, the fetcher should still have been registered the
// initial discovered shard;
// furthermore, the discovered shard should be considered a newly created shard while the
// job wasn't running,
// and therefore should be consumed from the earliest sequence number
KinesisStreamShardState restoredShardState = fetcher.getSubscribedShardsState().get(0);
assertEquals(TEST_STREAM_NAME, restoredShardState.getStreamShardHandle().getStreamName());
assertEquals(
TEST_SHARD_ID, restoredShardState.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredShardState.getStreamShardHandle().isClosed());
assertEquals(
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(),
restoredShardState.getLastProcessedSequenceNum());
assertThat(restoredShardState.getStreamShardHandle().getStreamName())
.isEqualTo(TEST_STREAM_NAME);
assertThat(restoredShardState.getStreamShardHandle().getShard().getShardId())
.isEqualTo(TEST_SHARD_ID);
assertThat(restoredShardState.getStreamShardHandle().isClosed()).isFalse();
assertThat(restoredShardState.getLastProcessedSequenceNum())
.isEqualTo(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());

consumerOperator.close();
consumerOperator.cancel();
Expand Down Expand Up @@ -246,20 +243,22 @@ public void testRestore() throws Exception {
consumerFunction.run(new TestSourceContext<>());

// assert that state is correctly restored
assertNotEquals(null, consumerFunction.getRestoredState());
assertEquals(1, consumerFunction.getRestoredState().size());
assertEquals(TEST_STATE, removeEquivalenceWrappers(consumerFunction.getRestoredState()));
assertEquals(1, fetcher.getSubscribedShardsState().size());
assertEquals(
TEST_SEQUENCE_NUMBER,
fetcher.getSubscribedShardsState().get(0).getLastProcessedSequenceNum());
assertThat(consumerFunction.getRestoredState()).isNotNull();
assertThat(consumerFunction.getRestoredState()).hasSize(1);
assertThat(removeEquivalenceWrappers(consumerFunction.getRestoredState()))
.isEqualTo(TEST_STATE);
assertThat(fetcher.getSubscribedShardsState()).hasSize(1);
assertThat(fetcher.getSubscribedShardsState().get(0).getLastProcessedSequenceNum())
.isEqualTo(TEST_SEQUENCE_NUMBER);

KinesisStreamShardState restoredShardState = fetcher.getSubscribedShardsState().get(0);
assertEquals(TEST_STREAM_NAME, restoredShardState.getStreamShardHandle().getStreamName());
assertEquals(
TEST_SHARD_ID, restoredShardState.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredShardState.getStreamShardHandle().isClosed());
assertEquals(TEST_SEQUENCE_NUMBER, restoredShardState.getLastProcessedSequenceNum());
assertThat(restoredShardState.getStreamShardHandle().getStreamName())
.isEqualTo(TEST_STREAM_NAME);
assertThat(restoredShardState.getStreamShardHandle().getShard().getShardId())
.isEqualTo(TEST_SHARD_ID);
assertThat(restoredShardState.getStreamShardHandle().isClosed()).isFalse();
assertThat(restoredShardState.getLastProcessedSequenceNum())
.isEqualTo(TEST_SEQUENCE_NUMBER);

consumerOperator.close();
consumerOperator.cancel();
Expand Down Expand Up @@ -339,46 +338,43 @@ public void testRestoreWithReshardedStream() throws Exception {
consumerFunction.run(new TestSourceContext<>());

// assert that state is correctly restored
assertNotEquals(null, consumerFunction.getRestoredState());
assertEquals(1, consumerFunction.getRestoredState().size());
assertEquals(TEST_STATE, removeEquivalenceWrappers(consumerFunction.getRestoredState()));
assertThat(consumerFunction.getRestoredState()).isNotNull();
assertThat(consumerFunction.getRestoredState()).hasSize(1);
assertThat(removeEquivalenceWrappers(consumerFunction.getRestoredState()))
.isEqualTo(TEST_STATE);

// assert that the fetcher is registered with all shards, including new shards
assertEquals(3, fetcher.getSubscribedShardsState().size());
assertThat(fetcher.getSubscribedShardsState()).hasSize(3);

KinesisStreamShardState restoredClosedShardState =
fetcher.getSubscribedShardsState().get(0);
assertEquals(
TEST_STREAM_NAME, restoredClosedShardState.getStreamShardHandle().getStreamName());
assertEquals(
TEST_SHARD_ID,
restoredClosedShardState.getStreamShardHandle().getShard().getShardId());
assertTrue(restoredClosedShardState.getStreamShardHandle().isClosed());
assertEquals(TEST_SEQUENCE_NUMBER, restoredClosedShardState.getLastProcessedSequenceNum());
assertThat(restoredClosedShardState.getStreamShardHandle().getStreamName())
.isEqualTo(TEST_STREAM_NAME);
assertThat(restoredClosedShardState.getStreamShardHandle().getShard().getShardId())
.isEqualTo(TEST_SHARD_ID);
assertThat(restoredClosedShardState.getStreamShardHandle().isClosed()).isTrue();
assertThat(restoredClosedShardState.getLastProcessedSequenceNum())
.isEqualTo(TEST_SEQUENCE_NUMBER);

KinesisStreamShardState restoredNewSplitShard1 = fetcher.getSubscribedShardsState().get(1);
assertEquals(
TEST_STREAM_NAME, restoredNewSplitShard1.getStreamShardHandle().getStreamName());
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(1),
restoredNewSplitShard1.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredNewSplitShard1.getStreamShardHandle().isClosed());
assertThat(restoredNewSplitShard1.getStreamShardHandle().getStreamName())
.isEqualTo(TEST_STREAM_NAME);
assertThat(restoredNewSplitShard1.getStreamShardHandle().getShard().getShardId())
.isEqualTo(KinesisShardIdGenerator.generateFromShardOrder(1));
assertThat(restoredNewSplitShard1.getStreamShardHandle().isClosed()).isFalse();
// new shards should be consumed from the beginning
assertEquals(
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(),
restoredNewSplitShard1.getLastProcessedSequenceNum());
assertThat(restoredNewSplitShard1.getLastProcessedSequenceNum())
.isEqualTo(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());

KinesisStreamShardState restoredNewSplitShard2 = fetcher.getSubscribedShardsState().get(2);
assertEquals(
TEST_STREAM_NAME, restoredNewSplitShard2.getStreamShardHandle().getStreamName());
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(2),
restoredNewSplitShard2.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredNewSplitShard2.getStreamShardHandle().isClosed());
assertThat(restoredNewSplitShard2.getStreamShardHandle().getStreamName())
.isEqualTo(TEST_STREAM_NAME);
assertThat(restoredNewSplitShard2.getStreamShardHandle().getShard().getShardId())
.isEqualTo(KinesisShardIdGenerator.generateFromShardOrder(2));
assertThat(restoredNewSplitShard2.getStreamShardHandle().isClosed()).isFalse();
// new shards should be consumed from the beginning
assertEquals(
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(),
restoredNewSplitShard2.getLastProcessedSequenceNum());
assertThat(restoredNewSplitShard2.getLastProcessedSequenceNum())
.isEqualTo(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());

consumerOperator.close();
consumerOperator.cancel();
Expand Down
Expand Up @@ -64,7 +64,6 @@
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
Expand All @@ -91,11 +90,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.HamcrestCondition.matching;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -185,12 +183,12 @@ public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exce
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

assertTrue(listState.isClearCalled());
assertThat(listState.isClearCalled()).isTrue();

// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
assertThat(listState.getList()).hasSize(globalUnionState.size() / 2);
assertThat(listState.getList()).contains(globalUnionState.get(0));
assertThat(listState.getList()).contains(globalUnionState.get(2));
}

@Test
Expand Down Expand Up @@ -295,12 +293,12 @@ public void testListStateChangedAfterSnapshotState() throws Exception {

mockedConsumer.snapshotState(mock(FunctionSnapshotContext.class));

assertEquals(true, listState.clearCalled);
assertEquals(3, listState.getList().size());
assertThat(listState.clearCalled).isTrue();
assertThat(listState.getList()).hasSize(3);

for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
assertNotEquals(state, currentState);
assertThat(currentState).isNotEqualTo(state);
}
}

Expand All @@ -309,7 +307,7 @@ public void testListStateChangedAfterSnapshotState() throws Exception {
for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
hasOneIsSame = hasOneIsSame || state.equals(currentState);
}
assertEquals(true, hasOneIsSame);
assertThat(hasOneIsSame).isTrue();
}
}

Expand Down Expand Up @@ -630,17 +628,16 @@ public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() {
.withEndingSequenceNumber(endingSequenceNumber));
KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);

assertEquals(
streamShardMetadata,
KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard));
assertThat(KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard))
.isEqualTo(streamShardMetadata);
}

@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
TypeInformation<StreamShardMetadata> typeInformation =
TypeInformation.of(StreamShardMetadata.class);
assertTrue(
typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
assertThat(typeInformation.createSerializer(new ExecutionConfig()))
.isInstanceOf(PojoSerializer.class);
}

/**
Expand Down Expand Up @@ -984,9 +981,13 @@ public void markAsTemporarilyIdle() {}
sourceFunc.cancel();
testHarness.close();

assertEquals("record count", recordCount, testHarness.getOutput().size());
assertThat(watermarks, org.hamcrest.Matchers.contains(new Watermark(-3), new Watermark(5)));
assertEquals("watermark count", watermarkCount, watermarks.size());
assertThat(testHarness.getOutput()).as("record count").hasSize(recordCount);
assertThat(watermarks)
.satisfies(
matching(
alpreu marked this conversation as resolved.
Show resolved Hide resolved
org.hamcrest.Matchers.contains(
new Watermark(-3), new Watermark(5))));
assertThat(watermarks).as("watermark count").hasSize(watermarkCount);
}

@Test
Expand Down Expand Up @@ -1137,8 +1138,9 @@ public void emitWatermark(Watermark mark) {
expectedResults.add(new Watermark(-4));
// verify watermark
awaitRecordCount(results, expectedResults.size());
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertEquals(0, TestWatermarkTracker.WATERMARK.get());
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
alpreu marked this conversation as resolved.
Show resolved Hide resolved
assertThat(TestWatermarkTracker.WATERMARK.get()).isEqualTo(0);

// trigger sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
Expand All @@ -1155,40 +1157,42 @@ public void emitWatermark(Watermark mark) {
while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) {
Thread.sleep(10);
}
assertEquals("first record received", 1, emitterQueue.getSize());
assertThat(emitterQueue.getSize()).as("first record received").isEqualTo(1);

// Advance the watermark. Since the new record is past global watermark + threshold,
// it won't be emitted and the watermark does not advance
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertEquals(
3000L,
(long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
alpreu marked this conversation as resolved.
Show resolved Hide resolved
assertThat((long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"))
.isEqualTo(3000L);
TestWatermarkTracker.assertGlobalWatermark(-4);

// Trigger global watermark sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
expectedResults.add(Long.toString(record2));
awaitRecordCount(results, expectedResults.size());
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
alpreu marked this conversation as resolved.
Show resolved Hide resolved
TestWatermarkTracker.assertGlobalWatermark(3000);

// Trigger watermark update and emit
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
expectedResults.add(new Watermark(3000));
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
alpreu marked this conversation as resolved.
Show resolved Hide resolved

// verify exception propagation
Assert.assertNull(sourceThreadError.get());
assertThat(sourceThreadError.get()).isNull();
throwOnCollect.set(true);
shard1.put(Long.toString(record2 + 1));

deadline = Deadline.fromNow(Duration.ofSeconds(10));
while (deadline.hasTimeLeft() && sourceThreadError.get() == null) {
Thread.sleep(10);
}
Assert.assertNotNull(sourceThreadError.get());
Assert.assertNotNull("expected", sourceThreadError.get().getMessage());
assertThat(sourceThreadError.get()).isNotNull();
assertThat(sourceThreadError.get().getMessage()).as("expected").isNotNull();

sourceFunc.cancel();
testHarness.close();
Expand All @@ -1203,8 +1207,7 @@ private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int

int received = queue.size();
if (received < count) {
Assert.fail(
String.format("Timeout waiting for records, received %d/%d", received, count));
fail(String.format("Timeout waiting for records, received %d/%d", received, count));
}
}

Expand All @@ -1213,7 +1216,8 @@ private static class OpenCheckingStringSchema extends SimpleStringSchema {

@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
assertThat(context.getMetricGroup(), notNullValue(MetricGroup.class));
assertThat(context.getMetricGroup())
.satisfies(matching(notNullValue(MetricGroup.class)));
alpreu marked this conversation as resolved.
Show resolved Hide resolved
this.opened = true;
}

Expand Down Expand Up @@ -1257,7 +1261,7 @@ public long updateWatermark(long localWatermark) {
}

static void assertGlobalWatermark(long expected) {
Assert.assertEquals(expected, WATERMARK.get());
assertThat(WATERMARK.get()).isEqualTo(expected);
}
}
}
Expand Up @@ -48,10 +48,10 @@
import java.util.stream.IntStream;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.hamcrest.Matchers.equalTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.HamcrestCondition.matching;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;

/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
@Ignore("See FLINK-23528")
Expand Down Expand Up @@ -123,14 +123,13 @@ public void testStopWithSavepoint() throws Exception {
List<String> result = stream.executeAndCollect(10000);
// stop with savepoint will most likely only return a small subset of the elements
// validate that the prefix is as expected
assertThat(result, hasSize(lessThan(numElements)));
assertThat(
result,
equalTo(
assertThat(result).satisfies(matching(hasSize(lessThan(numElements))));
alpreu marked this conversation as resolved.
Show resolved Hide resolved
assertThat(result)
.isEqualTo(
IntStream.range(0, numElements)
.mapToObj(String::valueOf)
.collect(Collectors.toList())
.subList(0, result.size())));
.subList(0, result.size()));
} finally {
stopTask.cancel(true);
}
Expand Down