Skip to content
Permalink
Browse files
[FLINK-27185][formats] Convert format modules to assertj
Co-authored-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper authored and MartijnVisser committed May 13, 2022
1 parent ce6ee9c commit 1e398214df6a31295cbd75c693de094c3b4e273b
Showing 29 changed files with 634 additions and 703 deletions.
@@ -59,7 +59,7 @@ void testSeDeSchema() {
final Map<String, String> options = getAllOptions();

final DynamicTableSource actualSource = FactoryMocks.createTableSource(SCHEMA, options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

@@ -73,7 +73,7 @@ void testSeDeSchema() {
new AvroRowDataSerializationSchema(ROW_TYPE);

final DynamicTableSink actualSink = FactoryMocks.createTableSink(SCHEMA, options);
assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;

@@ -44,9 +44,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link CompressWriterFactory}. */
public class CompressWriterFactoryTest extends TestLogger {
@@ -167,17 +165,17 @@ private File prepareCompressedFile(CompressWriterFactory<String> writer, List<St
private void validateResults(File folder, List<String> expected, CompressionCodec codec)
throws Exception {
File[] buckets = folder.listFiles();
assertNotNull(buckets);
assertEquals(1, buckets.length);
assertThat(buckets).isNotNull();
assertThat(buckets).hasSize(1);

final File[] partFiles = buckets[0].listFiles();
assertNotNull(partFiles);
assertEquals(1, partFiles.length);
assertThat(partFiles).isNotNull();
assertThat(partFiles).hasSize(1);

for (File partFile : partFiles) {
assertTrue(partFile.length() > 0);
assertThat(partFile.length()).isGreaterThan(0);
final List<String> fileContent = readFile(partFile, codec);
assertEquals(expected, fileContent);
assertThat(fileContent).isEqualTo(expected);
}
}

@@ -43,9 +43,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration test case for writing bulk encoded files with the {@link StreamingFileSink} and
@@ -102,18 +100,18 @@ private List<String> readFile(File file, CompressionCodec codec) throws Exceptio
private void validateResults(File folder, List<String> expected, CompressionCodec codec)
throws Exception {
File[] buckets = folder.listFiles();
assertNotNull(buckets);
assertEquals(1, buckets.length);
assertThat(buckets).isNotNull();
assertThat(buckets).hasSize(1);

final File[] partFiles = buckets[0].listFiles();
assertNotNull(partFiles);
assertEquals(2, partFiles.length);
assertThat(partFiles).isNotNull();
assertThat(partFiles).hasSize(2);

for (File partFile : partFiles) {
assertTrue(partFile.length() > 0);
assertThat(partFile.length()).isGreaterThan(0);

final List<String> fileContent = readFile(partFile, codec);
assertEquals(expected, fileContent);
assertThat(fileContent).isEqualTo(expected);
}
}
}
@@ -46,7 +46,7 @@
import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link CsvFormatFactory}. */
public class CsvFormatFactoryTest extends TestLogger {
@@ -67,7 +67,7 @@ public void testSeDeSchema() {
.build();
final Map<String, String> options = getAllOptions();
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
assertEquals(expectedDeser, actualDeser);
assertThat(actualDeser).isEqualTo(expectedDeser);

final CsvRowDataSerializationSchema expectedSer =
new CsvRowDataSerializationSchema.Builder(PHYSICAL_TYPE)
@@ -78,7 +78,7 @@ public void testSeDeSchema() {
.setNullLiteral("n/a")
.build();
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
assertEquals(expectedSer, actualSer);
assertThat(actualSer).isEqualTo(expectedSer);
}

@Test
@@ -103,7 +103,7 @@ public void testDisableQuoteCharacter() {
.build();
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);

assertEquals(expectedDeser, actualDeser);
assertThat(actualDeser).isEqualTo(expectedDeser);

final CsvRowDataSerializationSchema expectedSer =
new CsvRowDataSerializationSchema.Builder(PHYSICAL_TYPE)
@@ -115,7 +115,7 @@ public void testDisableQuoteCharacter() {
.build();
SerializationSchema<RowData> actualSer = createSerializationSchema(options);

assertEquals(expectedSer, actualSer);
assertThat(actualSer).isEqualTo(expectedSer);
}

@Test
@@ -174,23 +174,23 @@ public void testEscapedFieldDelimiter() throws IOException {
SerializationSchema<RowData> serializationSchema1 = createSerializationSchema(options1);
DeserializationSchema<RowData> deserializationSchema1 =
createDeserializationSchema(options1);
assertEquals(expectedSer, serializationSchema1);
assertEquals(expectedDeser, deserializationSchema1);
assertThat(serializationSchema1).isEqualTo(expectedSer);
assertThat(deserializationSchema1).isEqualTo(expectedDeser);

final Map<String, String> options2 =
getModifiedOptions(opts -> opts.put("csv.field-delimiter", "\\t"));
SerializationSchema<RowData> serializationSchema2 = createSerializationSchema(options2);
DeserializationSchema<RowData> deserializationSchema2 =
createDeserializationSchema(options2);
assertEquals(expectedSer, serializationSchema2);
assertEquals(expectedDeser, deserializationSchema2);
assertThat(serializationSchema2).isEqualTo(expectedSer);
assertThat(deserializationSchema2).isEqualTo(expectedDeser);

// test (de)serialization
RowData rowData = GenericRowData.of(fromString("abc"), 123, false);
byte[] bytes = serializationSchema2.serialize(rowData);
assertEquals("abc\t123\tfalse", new String(bytes));
assertThat(new String(bytes)).isEqualTo("abc\t123\tfalse");
RowData actual = deserializationSchema2.deserialize("abc\t123\tfalse".getBytes());
assertEquals(rowData, actual);
assertThat(actual).isEqualTo(rowData);
}

@Test
@@ -200,7 +200,7 @@ public void testDeserializeWithEscapedFieldDelimiter() throws IOException {
getModifiedOptions(opts -> opts.put("csv.field-delimiter", "\t"));

final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

@@ -209,7 +209,7 @@ public void testDeserializeWithEscapedFieldDelimiter() throws IOException {
ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
RowData expected = GenericRowData.of(fromString("abc"), 123, false);
RowData actual = deserializationSchema.deserialize("abc\t123\tfalse".getBytes());
assertEquals(expected, actual);
assertThat(actual).isEqualTo(expected);
}

@Test
@@ -263,7 +263,7 @@ private static Map<String, String> getAllOptions() {
private static DeserializationSchema<RowData> createDeserializationSchema(
Map<String, String> options) {
final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

@@ -274,7 +274,7 @@ private static DeserializationSchema<RowData> createDeserializationSchema(
private static SerializationSchema<RowData> createSerializationSchema(
Map<String, String> options) {
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;

@@ -19,7 +19,6 @@
package org.apache.flink.formats.csv;

import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -40,6 +39,7 @@
import java.time.LocalTime;
import java.util.function.Consumer;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.api.DataTypes.ARRAY;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.BOOLEAN;
@@ -61,11 +61,8 @@
import static org.apache.flink.table.data.StringData.fromString;
import static org.apache.flink.table.data.TimestampData.fromInstant;
import static org.apache.flink.table.data.TimestampData.fromLocalDateTime;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link CsvRowDataDeserializationSchema} and {@link CsvRowDataSerializationSchema}. */
public class CsvRowDataSerDeSchemaTest {
@@ -193,63 +190,60 @@ public void testSerializeDeserializeCustomizedProperties() throws Exception {
testFieldDeserialization(
TIME(0), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";");
int precision = 5;
try {
testFieldDeserialization(
TIME(precision), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";");
fail();
} catch (Exception e) {
assertEquals(
"Csv does not support TIME type with precision: 5, it only supports precision 0 ~ 3.",
e.getMessage());
}
assertThatThrownBy(
() ->
testFieldDeserialization(
TIME(precision),
"12:12:12.45",
LocalTime.parse("12:12:12"),
deserConfig,
";"))
.hasMessage(
"Csv does not support TIME type with precision: 5, it only supports precision 0 ~ 3.");
}

@Test
public void testDeserializeParseError() throws Exception {
try {
testDeserialization(false, false, "Test,null,Test"); // null not supported
fail("Missing field should cause failure.");
} catch (IOException e) {
// valid exception
}
public void testDeserializeParseError() {
assertThatThrownBy(() -> testDeserialization(false, false, "Test,null,Test"))
.isInstanceOf(IOException.class);
}

@Test
public void testDeserializeUnsupportedNull() throws Exception {
// unsupported null for integer
assertEquals(
Row.of("Test", null, "Test"), testDeserialization(true, false, "Test,null,Test"));
assertThat(testDeserialization(true, false, "Test,null,Test"))
.isEqualTo(Row.of("Test", null, "Test"));
}

@Test
public void testDeserializeNullRow() throws Exception {
// return null for null input
assertNull(testDeserialization(false, false, null));
assertThat(testDeserialization(false, false, null)).isNull();
}

@Test
public void testDeserializeIncompleteRow() throws Exception {
// last two columns are missing
assertEquals(Row.of("Test", null, null), testDeserialization(true, false, "Test"));
assertThat(testDeserialization(true, false, "Test")).isEqualTo(Row.of("Test", null, null));
}

@Test
public void testDeserializeMoreColumnsThanExpected() throws Exception {
// one additional string column
assertNull(testDeserialization(true, false, "Test,12,Test,Test"));
assertThat(testDeserialization(true, false, "Test,12,Test,Test")).isNull();
}

@Test
public void testDeserializeIgnoreComment() throws Exception {
// # is part of the string
assertEquals(
Row.of("#Test", 12, "Test"), testDeserialization(false, false, "#Test,12,Test"));
assertThat(testDeserialization(false, false, "#Test,12,Test"))
.isEqualTo(Row.of("#Test", 12, "Test"));
}

@Test
public void testDeserializeAllowComment() throws Exception {
// entire row is ignored
assertNull(testDeserialization(true, true, "#Test,12,Test"));
assertThat(testDeserialization(true, true, "#Test,12,Test")).isNull();
}

@Test
@@ -259,21 +253,18 @@ public void testSerializationProperties() throws Exception {
CsvRowDataSerializationSchema.Builder serSchemaBuilder =
new CsvRowDataSerializationSchema.Builder(rowType);

assertArrayEquals(
"Test,12,Hello".getBytes(),
serialize(serSchemaBuilder, rowData("Test", 12, "Hello")));
assertThat(serialize(serSchemaBuilder, rowData("Test", 12, "Hello")))
.isEqualTo("Test,12,Hello".getBytes());

serSchemaBuilder.setQuoteCharacter('#');

assertArrayEquals(
"Test,12,#2019-12-26 12:12:12#".getBytes(),
serialize(serSchemaBuilder, rowData("Test", 12, "2019-12-26 12:12:12")));
assertThat(serialize(serSchemaBuilder, rowData("Test", 12, "2019-12-26 12:12:12")))
.isEqualTo("Test,12,#2019-12-26 12:12:12#".getBytes());

serSchemaBuilder.disableQuoteCharacter();

assertArrayEquals(
"Test,12,2019-12-26 12:12:12".getBytes(),
serialize(serSchemaBuilder, rowData("Test", 12, "2019-12-26 12:12:12")));
assertThat(serialize(serSchemaBuilder, rowData("Test", 12, "2019-12-26 12:12:12")))
.isEqualTo("Test,12,2019-12-26 12:12:12".getBytes());
}

@Test(expected = IllegalArgumentException.class)
@@ -358,12 +349,8 @@ public void testSerializationWithTypesMismatch() {
new CsvRowDataSerializationSchema.Builder(rowType);
RowData rowData = rowData("Test", 1, "Test");
String errorMessage = "Fail to serialize at field: f2.";
try {
serialize(serSchemaBuilder, rowData);
fail("expecting exception message:" + errorMessage);
} catch (Throwable t) {
assertThat(t, FlinkMatchers.containsMessage(errorMessage));
}
assertThatThrownBy(() -> serialize(serSchemaBuilder, rowData))
.satisfies(anyCauseMatches(errorMessage));
}

@Test
@@ -374,12 +361,8 @@ public void testDeserializationWithTypesMismatch() {
new CsvRowDataDeserializationSchema.Builder(rowType, InternalTypeInfo.of(rowType));
String data = "Test,1,Test";
String errorMessage = "Fail to deserialize at field: f2.";
try {
deserialize(deserSchemaBuilder, data);
fail("expecting exception message:" + errorMessage);
} catch (Throwable t) {
assertThat(t, FlinkMatchers.containsMessage(errorMessage));
}
assertThatThrownBy(() -> deserialize(deserSchemaBuilder, data))
.satisfies(anyCauseMatches(errorMessage));
}

private void testNullableField(DataType fieldType, String string, Object value)
@@ -418,7 +401,7 @@ private void testField(
new CsvRowDataSerializationSchema.Builder(rowType);
serializationConfig.accept(serSchemaBuilder);
byte[] serializedRow = serialize(serSchemaBuilder, deserializedRow);
assertEquals(expectedCsv, new String(serializedRow));
assertThat(new String(serializedRow)).isEqualTo(expectedCsv);
}

@SuppressWarnings("unchecked")
@@ -444,7 +427,7 @@ private void testFieldDeserialization(
(Row)
DataFormatConverters.getConverterForDataType(dataType)
.toExternal(deserializedRow);
assertEquals(expectedRow, actualRow);
assertThat(actualRow).isEqualTo(expectedRow);
}

@SuppressWarnings("unchecked")
@@ -469,7 +452,7 @@ private void testSerDeConsistency(
RowData deserializedRow =
deserialize(
deserSchemaBuilder, new String(serialize(serSchemaBuilder, originalRow)));
assertEquals(deserializedRow, originalRow);
assertThat(originalRow).isEqualTo(deserializedRow);
}

private static byte[] serialize(

0 comments on commit 1e39821

Please sign in to comment.