Skip to content
Permalink
Browse files
[FLINK-27185][connector] Convert connector-cassandra module to assertj
Co-authored-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
2 people authored and MartijnVisser committed May 19, 2022
1 parent e049cea commit cc63db735e32d6b15ad77e9245accfe21f45581b
Showing 3 changed files with 94 additions and 100 deletions.
@@ -60,10 +60,10 @@
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.annotations.Table;
import net.bytebuddy.ByteBuddy;
import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -93,9 +93,7 @@
import scala.collection.JavaConverters;
import scala.collection.Seq;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.samePropertyValuesAs;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for all cassandra sinks. */
@SuppressWarnings("serial")
@@ -412,7 +410,7 @@ public void testAnnotatePojoWithTable() {

final Class<? extends Pojo> annotatedPojoClass = annotatePojoWithTable(KEYSPACE, tableName);
final Table pojoTableAnnotation = annotatedPojoClass.getAnnotation(Table.class);
assertTrue(pojoTableAnnotation.name().contains(tableName));
assertThat(pojoTableAnnotation.name()).contains(tableName);
}

@Test
@@ -425,9 +423,9 @@ public void testRaiseCassandraRequestsTimeouts() throws IOException {
"/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
final String configuration =
new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
assertTrue(configuration.contains("read_request_timeout_in_ms: 15000"));
assertTrue(configuration.contains("write_request_timeout_in_ms: 6000"));
assertThat(configuration).contains("request_timeout_in_ms: 30000");
assertThat(configuration).contains("read_request_timeout_in_ms: 15000");
assertThat(configuration).contains("write_request_timeout_in_ms: 6000");
}

// ------------------------------------------------------------------------
@@ -468,9 +466,9 @@ protected void verifyResultsIdealCircumstances(
for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
}
Assert.assertTrue(
"The following ID's were not found in the ResultSet: " + list.toString(),
list.isEmpty());
assertThat(list)
.as("The following ID's were not found in the ResultSet: " + list.toString())
.isEmpty();
}

@Override
@@ -486,9 +484,9 @@ protected void verifyResultsDataPersistenceUponMissedNotify(
for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
}
Assert.assertTrue(
"The following ID's were not found in the ResultSet: " + list.toString(),
list.isEmpty());
assertThat(list)
.as("The following ID's were not found in the ResultSet: " + list.toString())
.isEmpty();
}

@Override
@@ -507,9 +505,9 @@ protected void verifyResultsDataDiscardingUponRestore(
for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
}
Assert.assertTrue(
"The following ID's were not found in the ResultSet: " + list.toString(),
list.isEmpty());
assertThat(list)
.as("The following ID's were not found in the ResultSet: " + list.toString())
.isEmpty();
}

@Override
@@ -536,7 +534,7 @@ protected void verifyResultsWhenReScaling(
}

Collections.sort(actual);
Assert.assertArrayEquals(expected.toArray(), actual.toArray());
assertThat(actual.toArray()).isEqualTo(expected.toArray());
}

@Test
@@ -560,18 +558,18 @@ public void testCassandraCommitter() throws Exception {
cc2.open();
cc3.open();

Assert.assertFalse(cc1.isCheckpointCommitted(0, 1));
Assert.assertFalse(cc2.isCheckpointCommitted(1, 1));
Assert.assertFalse(cc3.isCheckpointCommitted(0, 1));
assertThat(cc1.isCheckpointCommitted(0, 1)).isFalse();
assertThat(cc2.isCheckpointCommitted(1, 1)).isFalse();
assertThat(cc3.isCheckpointCommitted(0, 1)).isFalse();

cc1.commitCheckpoint(0, 1);
Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
assertThat(cc1.isCheckpointCommitted(0, 1)).isTrue();
// verify that other sub-tasks aren't affected
Assert.assertFalse(cc2.isCheckpointCommitted(1, 1));
assertThat(cc2.isCheckpointCommitted(1, 1)).isFalse();
// verify that other tasks aren't affected
Assert.assertFalse(cc3.isCheckpointCommitted(0, 1));
assertThat(cc3.isCheckpointCommitted(0, 1)).isFalse();

Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
assertThat(cc1.isCheckpointCommitted(0, 2)).isFalse();

cc1.close();
cc2.close();
@@ -585,8 +583,8 @@ public void testCassandraCommitter() throws Exception {

// verify that checkpoint data is not destroyed within open/close and not reliant on
// internally cached data
Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
assertThat(cc1.isCheckpointCommitted(0, 1)).isTrue();
assertThat(cc1.isCheckpointCommitted(0, 2)).isFalse();

cc1.close();
}
@@ -609,7 +607,7 @@ public void testCassandraTupleAtLeastOnceSink() throws Exception {
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
assertThat(rs.all()).hasSize(20);
}

@Test
@@ -627,7 +625,7 @@ public void testCassandraRowAtLeastOnceSink() throws Exception {
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
assertThat(rs.all()).hasSize(20);
}

@Test
@@ -637,7 +635,7 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception {
writePojos(annotatedPojoClass, null);

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
assertThat(rs.all()).hasSize(20);
}

@Test
@@ -646,7 +644,7 @@ public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Excepti
annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
writePojos(annotatedPojoClass, KEYSPACE);
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
assertThat(rs.all()).hasSize(20);
}

private <T> void writePojos(Class<T> annotatedPojoClass, @Nullable String keyspace)
@@ -696,12 +694,11 @@ builderForWriting, injectTableName(INSERT_DATA_QUERY))
cmp.setField(0, o.getString(0));
cmp.setField(1, o.getInt(2));
cmp.setField(2, o.getInt(1));
Assert.assertTrue(
"Row " + cmp + " was written to Cassandra but not in input.",
input.remove(cmp));
assertThat(input.remove(cmp))
.as("Row " + cmp + " was written to Cassandra but not in input.")
.isTrue();
}
Assert.assertTrue(
"The input data was not completely written to Cassandra", input.isEmpty());
assertThat(input).as("The input data was not completely written to Cassandra").isEmpty();
}

private static int retrialsCount = 0;
@@ -723,11 +720,16 @@ public void testCassandraBatchPojoFormat() throws Exception {

final List<? extends Pojo> pojos = writePojosWithOutputFormat(annotatedPojoClass);
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
assertThat(rs.all()).hasSize(20);

final List<? extends Pojo> result = readPojosWithInputFormat(annotatedPojoClass);
Assert.assertEquals(20, result.size());
assertThat(result, samePropertyValuesAs(pojos));
assertThat(result).hasSize(20);
assertThat(result)
.usingRecursiveComparison(
RecursiveComparisonConfiguration.builder()
.withIgnoreCollectionOrder(true)
.build())
.isEqualTo(pojos);
}

@Test
@@ -770,7 +772,7 @@ public void testCassandraBatchTupleFormat() throws Exception {
source.close();
}

Assert.assertEquals(20, result.size());
assertThat(result).hasSize(20);
}

@Test
@@ -790,7 +792,7 @@ public void testCassandraBatchRowFormat() throws Exception {

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
List<com.datastax.driver.core.Row> rows = rs.all();
Assert.assertEquals(rowCollection.size(), rows.size());
assertThat(rows).hasSize(rowCollection.size());
}

@Test
@@ -820,7 +822,7 @@ public TypeSerializer<scala.Tuple1<String>> createSerializer(

CassandraSink.CassandraSinkBuilder<scala.Tuple1<String>> sinkBuilder =
CassandraSink.addSink(input);
assertTrue(sinkBuilder instanceof CassandraSink.CassandraScalaProductSinkBuilder);
assertThat(sinkBuilder).isInstanceOf(CassandraSink.CassandraScalaProductSinkBuilder.class);
}

@Test
@@ -844,7 +846,7 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception {

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
List<com.datastax.driver.core.Row> rows = rs.all();
Assert.assertEquals(scalaTupleCollection.size(), rows.size());
assertThat(rows).hasSize(scalaTupleCollection.size());

for (com.datastax.driver.core.Row row : rows) {
scalaTupleCollection.remove(
@@ -853,7 +855,7 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception {
row.getInt(TUPLE_COUNTER_FIELD),
row.getInt(TUPLE_BATCHID_FIELD)));
}
Assert.assertEquals(0, scalaTupleCollection.size());
assertThat(scalaTupleCollection).isEmpty();
}

@Test
@@ -884,15 +886,15 @@ public void testCassandraScalaTuplePartialColumnUpdate() throws Exception {

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
List<com.datastax.driver.core.Row> rows = rs.all();
Assert.assertEquals(1, rows.size());
assertThat(rows).hasSize(1);
// Since nulls are ignored, we should be reading one complete record
for (com.datastax.driver.core.Row row : rows) {
Assert.assertEquals(
new scala.Tuple3<>(id, counter, batchId),
new scala.Tuple3<>(
row.getString(TUPLE_ID_FIELD),
row.getInt(TUPLE_COUNTER_FIELD),
row.getInt(TUPLE_BATCHID_FIELD)));
assertThat(
new scala.Tuple3<>(
row.getString(TUPLE_ID_FIELD),
row.getInt(TUPLE_COUNTER_FIELD),
row.getInt(TUPLE_BATCHID_FIELD)))
.isEqualTo(new scala.Tuple3<>(id, counter, batchId));
}
}
}

0 comments on commit cc63db7

Please sign in to comment.