Skip to content
Permalink
Browse files
[FLINK-27185][connector] Convert connector-elasticsearch modules to a…
…ssertj

Co-authored-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
2 people authored and MartijnVisser committed May 18, 2022
1 parent 29c72ce commit e0a178f75418919b6aa45af9647e1f09e08f761b
Showing 16 changed files with 373 additions and 471 deletions.
@@ -53,9 +53,7 @@
import java.util.UUID;
import java.util.function.BiFunction;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ElasticsearchSink}. */
@ExtendWith(TestLoggerExtension.class)
@@ -114,9 +112,9 @@ void testWriteToElasticSearchWithDeliveryGuarantee(DeliveryGuarantee deliveryGua
runTest(index, false, TestEmitter::jsonEmitter, deliveryGuarantee, null);
} catch (IllegalStateException e) {
failure = true;
assertSame(deliveryGuarantee, DeliveryGuarantee.EXACTLY_ONCE);
assertThat(deliveryGuarantee).isSameAs(DeliveryGuarantee.EXACTLY_ONCE);
} finally {
assertEquals(failure, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
assertThat(failure).isEqualTo(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
}
}

@@ -134,7 +132,7 @@ void testWriteJsonToElasticsearch(
void testRecovery() throws Exception {
final String index = "test-recovery-elasticsearch-sink";
runTest(index, true, TestEmitter::jsonEmitter, new FailingMapper());
assertTrue(failed);
assertThat(failed).isTrue();
}

private void runTest(
@@ -30,8 +30,8 @@
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ElasticsearchSinkBuilderBase}. */
@ExtendWith(TestLoggerExtension.class)
@@ -54,7 +54,7 @@ Stream<DynamicTest> testValidBuilders() {
return DynamicTest.stream(
validBuilders,
ElasticsearchSinkBuilderBase::toString,
builder -> assertDoesNotThrow(builder::build));
builder -> assertThatCode(builder::build).doesNotThrowAnyException());
}

@Test
@@ -65,36 +65,38 @@ void testDefaultDeliveryGuarantee() {

@Test
void testThrowIfExactlyOnceConfigured() {
assertThrows(
IllegalStateException.class,
() -> createMinimalBuilder().setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE));
assertThatThrownBy(
() ->
createMinimalBuilder()
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE))
.isInstanceOf(IllegalStateException.class);
}

@Test
void testThrowIfHostsNotSet() {
assertThrows(
NullPointerException.class,
() -> createEmptyBuilder().setEmitter((element, indexer, context) -> {}).build());
assertThatThrownBy(
() ->
createEmptyBuilder()
.setEmitter((element, indexer, context) -> {})
.build())
.isInstanceOf(NullPointerException.class);
}

@Test
void testThrowIfEmitterNotSet() {
assertThrows(
NullPointerException.class,
() -> createEmptyBuilder().setHosts(new HttpHost("localhost:3000")).build());
assertThatThrownBy(
() -> createEmptyBuilder().setHosts(new HttpHost("localhost:3000")).build())
.isInstanceOf(NullPointerException.class);
}

@Test
void testThrowIfSetInvalidTimeouts() {
assertThrows(
IllegalStateException.class,
() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build());
assertThrows(
IllegalStateException.class,
() -> createEmptyBuilder().setConnectionTimeout(-1).build());
assertThrows(
IllegalStateException.class,
() -> createEmptyBuilder().setSocketTimeout(-1).build());
assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build())
.isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1).build())
.isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1).build())
.isInstanceOf(IllegalStateException.class);
}

abstract B createEmptyBuilder();
@@ -69,10 +69,7 @@

import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.DOCUMENT_TYPE;
import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.buildMessage;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ElasticsearchWriter}. */
@Testcontainers
@@ -195,20 +192,20 @@ void testIncrementByteOutMetric() throws Exception {
try (final ElasticsearchWriter<Tuple2<Integer, String>> writer =
createWriter(index, false, bulkProcessorConfig, metricGroup)) {
final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
assertEquals(numBytesOut.getCount(), 0);
assertThat(numBytesOut.getCount()).isEqualTo(0);
writer.write(Tuple2.of(1, buildMessage(1)), null);
writer.write(Tuple2.of(2, buildMessage(2)), null);

writer.blockingFlushAllActions();
long first = numBytesOut.getCount();

assertTrue(first > 0);
assertThat(first).isGreaterThan(0);

writer.write(Tuple2.of(1, buildMessage(1)), null);
writer.write(Tuple2.of(2, buildMessage(2)), null);

writer.blockingFlushAllActions();
assertTrue(numBytesOut.getCount() > first);
assertThat(numBytesOut.getCount()).isGreaterThan(first);
}
}

@@ -231,8 +228,8 @@ void testIncrementRecordsSendMetric() throws Exception {

writer.blockingFlushAllActions();

assertTrue(recordsSend.isPresent());
assertEquals(recordsSend.get().getCount(), 3L);
assertThat(recordsSend).isPresent();
assertThat(recordsSend.get().getCount()).isEqualTo(3L);
}
}

@@ -252,8 +249,8 @@ void testCurrentSendTime() throws Exception {

writer.blockingFlushAllActions();

assertTrue(currentSendTime.isPresent());
assertThat(currentSendTime.get().getValue(), greaterThan(0L));
assertThat(currentSendTime).isPresent();
assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
}
}

@@ -23,8 +23,7 @@

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.assertj.core.api.Assertions.assertThat;

abstract class TestClientBase {

@@ -42,10 +41,11 @@ void assertThatIdsAreNotWritten(String index, int... ids) throws IOException {
for (final int id : ids) {
try {
final GetResponse response = getResponse(index, id);
assertFalse(
response.isExists(), String.format("Id %s is unexpectedly present.", id));
assertThat(response.isExists())
.as(String.format("Id %s is unexpectedly present.", id))
.isFalse();
} catch (ElasticsearchStatusException e) {
assertEquals(404, e.status().getStatus());
assertThat(e.status().getStatus()).isEqualTo(404);
}
}
}
@@ -58,7 +58,7 @@ void assertThatIdsAreWritten(String index, int... ids)
response = getResponse(index, id);
Thread.sleep(10);
} while (response.isSourceEmpty());
assertEquals(buildMessage(id), response.getSource().get(DATA_FIELD_NAME));
assertThat(response.getSource().get(DATA_FIELD_NAME)).isEqualTo(buildMessage(id));
}
}

@@ -44,7 +44,6 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@@ -62,6 +61,7 @@
import java.util.Map;

import static org.apache.flink.table.api.Expressions.row;
import static org.assertj.core.api.Assertions.assertThat;

/** IT tests for {@link ElasticsearchDynamicSink}. */
@ExtendWith(TestLoggerExtension.class)
@@ -142,7 +142,7 @@ public void testWritingDocuments() throws Exception {
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12 12:12:12");
Assertions.assertEquals(response, expectedMap);
assertThat(response).isEqualTo(expectedMap);
}

@Test
@@ -190,7 +190,7 @@ public void testWritingDocumentsFromTableApi() throws Exception {
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12 12:12:12");
Assertions.assertEquals(response, expectedMap);
assertThat(response).isEqualTo(expectedMap);
}

@Test
@@ -273,7 +273,7 @@ public void testWritingDocumentsNoPrimaryKey() throws Exception {
HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
expectedSet.add(expectedMap1);
expectedSet.add(expectedMap2);
Assertions.assertEquals(resultSet, expectedSet);
assertThat(resultSet).isEqualTo(expectedSet);
}

@Test
@@ -302,7 +302,7 @@ public void testWritingDocumentsWithDynamicIndex() throws Exception {
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "2012-12-12 12:12:12");
Assertions.assertEquals(response, expectedMap);
assertThat(response).isEqualTo(expectedMap);
}

@Test
@@ -353,6 +353,6 @@ public void testWritingDocumentsWithDynamicIndexFromSystemTime() throws Exceptio
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "2012-12-12 12:12:12");
Assertions.assertEquals(response, expectedMap);
assertThat(response).isEqualTo(expectedMap);
}
}

0 comments on commit e0a178f

Please sign in to comment.