Skip to content

Commit

Permalink
Address connector-kinesis feeback
Browse files Browse the repository at this point in the history
  • Loading branch information
alpreu committed May 19, 2022
1 parent 0802f8e commit e88e572
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 48 deletions.
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
Expand Down Expand Up @@ -92,7 +91,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.HamcrestCondition.matching;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -981,11 +979,7 @@ public void markAsTemporarilyIdle() {}
testHarness.close();

assertThat(testHarness.getOutput()).as("record count").hasSize(recordCount);
assertThat(watermarks)
.satisfies(
matching(
org.hamcrest.Matchers.contains(
new Watermark(-3), new Watermark(5))));
assertThat(watermarks).contains(new Watermark(-3), new Watermark(5));
assertThat(watermarks).as("watermark count").hasSize(watermarkCount);
}

Expand Down Expand Up @@ -1137,8 +1131,7 @@ public void emitWatermark(Watermark mark) {
expectedResults.add(new Watermark(-4));
// verify watermark
awaitRecordCount(results, expectedResults.size());
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
assertThat(results).contains(expectedResults);
assertThat(TestWatermarkTracker.WATERMARK.get()).isEqualTo(0);

// trigger sync
Expand Down Expand Up @@ -1171,15 +1164,13 @@ public void emitWatermark(Watermark mark) {
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
expectedResults.add(Long.toString(record2));
awaitRecordCount(results, expectedResults.size());
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
assertThat(results).contains(expectedResults);
TestWatermarkTracker.assertGlobalWatermark(3000);

// Trigger watermark update and emit
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
expectedResults.add(new Watermark(3000));
assertThat(results)
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray())));
assertThat(results).contains(expectedResults);

// verify exception propagation
assertThat(sourceThreadError.get()).isNull();
Expand Down Expand Up @@ -1213,8 +1204,7 @@ private static class OpenCheckingStringSchema extends SimpleStringSchema {

@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
assertThat(context.getMetricGroup())
.satisfies(matching(notNullValue(MetricGroup.class)));
assertThat(context.getMetricGroup()).isNotNull();
this.opened = true;
}

Expand Down
Expand Up @@ -49,9 +49,6 @@

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.HamcrestCondition.matching;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;

/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
@Ignore("See FLINK-23528")
Expand Down Expand Up @@ -123,7 +120,7 @@ public void testStopWithSavepoint() throws Exception {
List<String> result = stream.executeAndCollect(10000);
// stop with savepoint will most likely only return a small subset of the elements
// validate that the prefix is as expected
assertThat(result).satisfies(matching(hasSize(lessThan(numElements))));
assertThat(result).size().isLessThan(numElements);
assertThat(result)
.isEqualTo(
IntStream.range(0, numElements)
Expand Down
Expand Up @@ -63,9 +63,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.HamcrestCondition.matching;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -197,7 +194,7 @@ public void testGetShardList() throws Exception {
List<StreamShardHandle> actualShardList =
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
assertThat(actualShardList).satisfies(matching(hasSize(4)));
assertThat(actualShardList).hasSize(4);
for (int i = 0; i < 4; i++) {
StreamShardHandle shardHandle =
new StreamShardHandle(
Expand All @@ -209,11 +206,8 @@ public void testGetShardList() throws Exception {
}

assertThat(actualShardList)
.satisfies(
matching(
containsInAnyOrder(
expectedStreamShard.toArray(
new StreamShardHandle[actualShardList.size()]))));
.containsExactlyInAnyOrder(
expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]));
}

@Test
Expand Down Expand Up @@ -253,7 +247,7 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception {

List<StreamShardHandle> actualShardList =
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
assertThat(actualShardList).satisfies(matching(hasSize(2)));
assertThat(actualShardList).hasSize(2);

List<StreamShardHandle> expectedStreamShard =
IntStream.range(0, actualShardList.size())
Expand All @@ -269,11 +263,8 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception {
.collect(Collectors.toList());

assertThat(actualShardList)
.satisfies(
matching(
containsInAnyOrder(
expectedStreamShard.toArray(
new StreamShardHandle[actualShardList.size()]))));
.containsExactlyInAnyOrder(
expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]));

// given new shards
ListShardsResult responseSecond =
Expand All @@ -296,7 +287,7 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception {

List<StreamShardHandle> newActualShardList =
newShardListResult.getRetrievedShardListOfStream(fakeStreamName);
assertThat(newActualShardList).satisfies(matching(hasSize(1)));
assertThat(newActualShardList).hasSize(1);

List<StreamShardHandle> newExpectedStreamShard =
Collections.singletonList(
Expand All @@ -308,12 +299,9 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception {
2))));

assertThat(newActualShardList)
.satisfies(
matching(
containsInAnyOrder(
newExpectedStreamShard.toArray(
new StreamShardHandle
[newActualShardList.size()]))));
.containsExactlyInAnyOrder(
newExpectedStreamShard.toArray(
new StreamShardHandle[newActualShardList.size()]));
}

@Test
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.testutils.executor.TestExecutorResource;

import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;

Expand All @@ -33,7 +32,6 @@
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.HamcrestCondition.matching;

/** Test for {@link RecordEmitter}. */
public class RecordEmitterTest {
Expand Down Expand Up @@ -83,7 +81,7 @@ public void test() throws Exception {
}
emitter.stop();

assertThat(emitter.results).satisfies(matching(Matchers.contains(one, five, two, ten)));
assertThat(emitter.results).contains(one, five, two, ten);
}

@Test
Expand Down Expand Up @@ -122,17 +120,15 @@ public void testRetainMinAfterReachingLimit() throws Exception {
while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
Thread.sleep(10);
}
assertThat(emitter.results)
.satisfies(matching(Matchers.contains(one, two, three, ten)));
assertThat(emitter.results).contains(one, two, three, ten);

// advance watermark, emits remaining record from queue0
emitter.setCurrentWatermark(10);
dl = Deadline.fromNow(Duration.ofSeconds(10));
while (emitter.results.size() != 5 && dl.hasTimeLeft()) {
Thread.sleep(10);
}
assertThat(emitter.results)
.satisfies(matching(Matchers.contains(one, two, three, ten, eleven)));
assertThat(emitter.results).contains(one, two, three, ten, eleven);
} finally {
emitter.stop();
}
Expand Down

0 comments on commit e88e572

Please sign in to comment.