diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java deleted file mode 100644 index 74c5d343e996..000000000000 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.flink.util.ArrayUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.relocated.com.google.common.base.Joiner; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public abstract class FlinkCatalogTestBase extends FlinkTestBase { - - protected static final String DATABASE = "db"; - private static TemporaryFolder hiveWarehouse = new TemporaryFolder(); - private static TemporaryFolder hadoopWarehouse = new TemporaryFolder(); - - @BeforeClass - public static void createWarehouse() throws IOException { - hiveWarehouse.create(); - hadoopWarehouse.create(); - } - - @AfterClass - public static void dropWarehouse() { - hiveWarehouse.delete(); - hadoopWarehouse.delete(); - } - - @Before - public void before() { - sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config)); - } - - @After - public void clean() { - dropCatalog(catalogName, true); - } - - @Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}") - public static Iterable parameters() { - return Lists.newArrayList( - new Object[] {"testhive", Namespace.empty()}, - new Object[] {"testhadoop", Namespace.empty()}, - new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")}); - } - - protected final String catalogName; - protected final Namespace baseNamespace; - protected final Catalog validationCatalog; - protected final SupportsNamespaces validationNamespaceCatalog; - protected final Map config = Maps.newHashMap(); - - protected final String flinkDatabase; - protected final Namespace icebergNamespace; - protected final boolean isHadoopCatalog; - - public FlinkCatalogTestBase(String catalogName, Namespace baseNamespace) { - this.catalogName = catalogName; - this.baseNamespace = baseNamespace; - this.isHadoopCatalog = catalogName.startsWith("testhadoop"); - this.validationCatalog = - isHadoopCatalog - ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getRoot()) - : catalog; - this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; - - config.put("type", "iceberg"); - if (!baseNamespace.isEmpty()) { - config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString()); - } - if (isHadoopCatalog) { - config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"); - } else { - config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); - config.put(CatalogProperties.URI, getURI(hiveConf)); - } - config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot())); - - this.flinkDatabase = catalogName + "." + DATABASE; - this.icebergNamespace = - Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {DATABASE})); - } - - protected String warehouseRoot() { - if (isHadoopCatalog) { - return hadoopWarehouse.getRoot().getAbsolutePath(); - } else { - return hiveWarehouse.getRoot().getAbsolutePath(); - } - } - - protected String getFullQualifiedTableName(String tableName) { - final List levels = Lists.newArrayList(icebergNamespace.levels()); - levels.add(tableName); - return Joiner.on('.').join(levels); - } - - static String getURI(HiveConf conf) { - return conf.get(HiveConf.ConfVars.METASTOREURIS.varname); - } - - static String toWithClause(Map props) { - StringBuilder builder = new StringBuilder(); - builder.append("("); - int propCount = 0; - for (Map.Entry entry : props.entrySet()) { - if (propCount > 0) { - builder.append(","); - } - builder - .append("'") - .append(entry.getKey()) - .append("'") - .append("=") - .append("'") - .append(entry.getValue()) - .append("'"); - propCount++; - } - builder.append(")"); - return builder.toString(); - } -} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java index 4fc0207f269e..3986f1a796a5 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java @@ -45,7 +45,7 @@ public abstract class TestBase extends TestBaseUtils { public static MiniClusterExtension miniClusterResource = MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); - @TempDir Path temporaryDirectory; + @TempDir protected Path temporaryDirectory; private static TestHiveMetastore metastore = null; protected static HiveConf hiveConf = null; diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 8f5ddde91851..ef0802d8693d 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -19,6 +19,8 @@ package org.apache.iceberg.flink; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Arrays; import java.util.Collections; @@ -46,30 +48,21 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; -public class TestFlinkCatalogTable extends FlinkCatalogTestBase { - - public TestFlinkCatalogTable(String catalogName, Namespace baseNamespace) { - super(catalogName, baseNamespace); - } +public class TestFlinkCatalogTable extends CatalogTestBase { @Override - @Before + @BeforeEach public void before() { super.before(); sql("CREATE DATABASE %s", flinkDatabase); @@ -77,7 +70,7 @@ public void before() { sql("USE %s", DATABASE); } - @After + @AfterEach public void cleanNamespaces() { sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase); sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase); @@ -85,7 +78,7 @@ public void cleanNamespaces() { super.clean(); } - @Test + @TestTemplate public void testGetTable() { sql("CREATE TABLE tl(id BIGINT, strV STRING)"); @@ -94,84 +87,73 @@ public void testGetTable() { new Schema( Types.NestedField.optional(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "strV", Types.StringType.get())); - Assert.assertEquals( - "Should load the expected iceberg schema", iSchema.toString(), table.schema().toString()); + assertThat(table.schema().toString()) + .as("Should load the expected iceberg schema") + .isEqualTo(iSchema.toString()); } - @Test + @TestTemplate public void testRenameTable() { - Assume.assumeFalse("HadoopCatalog does not support rename table", isHadoopCatalog); - + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support rename table").isFalse(); final Schema tableSchema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())); validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"), tableSchema); sql("ALTER TABLE tl RENAME TO tl2"); - Assertions.assertThatThrownBy(() -> getTableEnv().from("tl")) + assertThatThrownBy(() -> getTableEnv().from("tl")) .isInstanceOf(ValidationException.class) .hasMessage("Table `tl` was not found."); Schema actualSchema = FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema()); - Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct()); + assertThat(tableSchema.asStruct()).isEqualTo(actualSchema.asStruct()); } - @Test + @TestTemplate public void testCreateTable() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT)"); Table table = table("tl"); - Assert.assertEquals( - new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), - table.schema().asStruct()); - + assertThat(table.schema().asStruct()) + .isEqualTo( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct()); CatalogTable catalogTable = catalogTable("tl"); - Assert.assertEquals( - TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } - @Test + @TestTemplate public void testCreateTableWithPrimaryKey() throws Exception { sql("CREATE TABLE tl(id BIGINT, data STRING, key STRING PRIMARY KEY NOT ENFORCED)"); Table table = table("tl"); - Assert.assertEquals( - "Should have the expected row key.", - Sets.newHashSet(table.schema().findField("key").fieldId()), - table.schema().identifierFieldIds()); - + assertThat(table.schema().identifierFieldIds()) + .as("Should have the expected row key.") + .isEqualTo(Sets.newHashSet(table.schema().findField("key").fieldId())); CatalogTable catalogTable = catalogTable("tl"); Optional uniqueConstraintOptional = catalogTable.getSchema().getPrimaryKey(); - Assert.assertTrue( - "Should have the expected unique constraint", uniqueConstraintOptional.isPresent()); - Assert.assertEquals( - "Should have the expected columns", - ImmutableList.of("key"), - uniqueConstraintOptional.get().getColumns()); + assertThat(uniqueConstraintOptional).isPresent(); + assertThat(uniqueConstraintOptional.get().getColumns()).containsExactly("key"); } - @Test + @TestTemplate public void testCreateTableWithMultiColumnsInPrimaryKey() throws Exception { sql( "CREATE TABLE tl(id BIGINT, data STRING, CONSTRAINT pk_constraint PRIMARY KEY(data, id) NOT ENFORCED)"); Table table = table("tl"); - Assert.assertEquals( - "Should have the expected RowKey", - Sets.newHashSet( - table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId()), - table.schema().identifierFieldIds()); - + assertThat(table.schema().identifierFieldIds()) + .as("Should have the expected RowKey") + .isEqualTo( + Sets.newHashSet( + table.schema().findField("id").fieldId(), + table.schema().findField("data").fieldId())); CatalogTable catalogTable = catalogTable("tl"); Optional uniqueConstraintOptional = catalogTable.getSchema().getPrimaryKey(); - Assert.assertTrue( - "Should have the expected unique constraint", uniqueConstraintOptional.isPresent()); - Assert.assertEquals( - "Should have the expected columns", - ImmutableSet.of("data", "id"), - ImmutableSet.copyOf(uniqueConstraintOptional.get().getColumns())); + assertThat(uniqueConstraintOptional).isPresent(); + assertThat(uniqueConstraintOptional.get().getColumns()).containsExactly("id", "data"); } - @Test + @TestTemplate public void testCreateTableIfNotExists() { sql("CREATE TABLE tl(id BIGINT)"); @@ -193,97 +175,96 @@ public void testCreateTableIfNotExists() { assertThat(table("tl").properties()).containsEntry("key", "value"); } - @Test + @TestTemplate public void testCreateTableLike() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT)"); sql("CREATE TABLE tl2 LIKE tl"); Table table = table("tl2"); - Assert.assertEquals( - new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), - table.schema().asStruct()); - + assertThat(table.schema().asStruct()) + .isEqualTo( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct()); CatalogTable catalogTable = catalogTable("tl2"); - Assert.assertEquals( - TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } - @Test + @TestTemplate public void testCreateTableLocation() { - Assume.assumeFalse( - "HadoopCatalog does not support creating table with location", isHadoopCatalog); - + assumeThat(isHadoopCatalog) + .as("HadoopCatalog does not support creating table with location") + .isFalse(); sql("CREATE TABLE tl(id BIGINT) WITH ('location'='file:///tmp/location')"); Table table = table("tl"); - Assert.assertEquals( - new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), - table.schema().asStruct()); - Assert.assertEquals("file:///tmp/location", table.location()); + assertThat(table.schema().asStruct()) + .isEqualTo( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct()); + assertThat(table.location()).isEqualTo("file:///tmp/location"); } - @Test + @TestTemplate public void testCreatePartitionTable() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED BY(dt)"); Table table = table("tl"); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - table.schema().asStruct()); - Assert.assertEquals( - PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec()); - + assertThat(table.schema().asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); + assertThat(table.spec()) + .isEqualTo(PartitionSpec.builderFor(table.schema()).identity("dt").build()); CatalogTable catalogTable = catalogTable("tl"); - Assert.assertEquals( - TableSchema.builder() - .field("id", DataTypes.BIGINT()) - .field("dt", DataTypes.STRING()) - .build(), - catalogTable.getSchema()); - Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys()); + assertThat(catalogTable.getSchema()) + .isEqualTo( + TableSchema.builder() + .field("id", DataTypes.BIGINT()) + .field("dt", DataTypes.STRING()) + .build()); + assertThat(catalogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("dt")); } - @Test + @TestTemplate public void testCreateTableWithFormatV2ThroughTableProperty() throws Exception { sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')"); Table table = table("tl"); - Assert.assertEquals( - "should create table using format v2", - 2, - ((BaseTable) table).operations().current().formatVersion()); + assertThat(((BaseTable) table).operations().current().formatVersion()).isEqualTo(2); } - @Test + @TestTemplate public void testUpgradeTableWithFormatV2ThroughTableProperty() throws Exception { sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='1')"); Table table = table("tl"); TableOperations ops = ((BaseTable) table).operations(); - Assert.assertEquals("should create table using format v1", 1, ops.refresh().formatVersion()); - + assertThat(ops.refresh().formatVersion()) + .as("should create table using format v1") + .isEqualTo(1); sql("ALTER TABLE tl SET('format-version'='2')"); - Assert.assertEquals("should update table to use format v2", 2, ops.refresh().formatVersion()); + assertThat(ops.refresh().formatVersion()) + .as("should update table to use format v2") + .isEqualTo(2); } - @Test + @TestTemplate public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws Exception { sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')"); Table table = table("tl"); TableOperations ops = ((BaseTable) table).operations(); - Assert.assertEquals("should create table using format v2", 2, ops.refresh().formatVersion()); - + assertThat(ops.refresh().formatVersion()) + .as("should create table using format v2") + .isEqualTo(2); Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl SET('format-version'='1')")) .rootCause() .isInstanceOf(IllegalArgumentException.class) .hasMessage("Cannot downgrade v2 table to v1"); } - @Test + @TestTemplate public void testLoadTransformPartitionTable() throws TableNotExistException { Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())); validationCatalog.createTable( @@ -292,12 +273,12 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { PartitionSpec.builderFor(schema).bucket("id", 100).build()); CatalogTable catalogTable = catalogTable("tl"); - Assert.assertEquals( - TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys()); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + assertThat(catalogTable.getPartitionKeys()).isEmpty(); } - @Test + @TestTemplate public void testAlterTableProperties() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); Map properties = Maps.newHashMap(); @@ -319,35 +300,32 @@ public void testAlterTableProperties() throws TableNotExistException { assertThat(table("tl").properties()).containsAllEntriesOf(properties); } - @Test + @TestTemplate public void testAlterTableAddColumn() { sql("CREATE TABLE tl(id BIGINT)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), - schemaBefore.asStruct()); - + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct()); sql("ALTER TABLE tl ADD (dt STRING)"); Schema schemaAfter1 = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaAfter1.asStruct()); - + assertThat(schemaAfter1.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); // Add multiple columns sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); Schema schemaAfter2 = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get()), - Types.NestedField.optional(4, "col2", Types.LongType.get())) - .asStruct(), - schemaAfter2.asStruct()); - + assertThat(schemaAfter2.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct()); // Adding a required field should fail because Iceberg's SchemaUpdate does not allow // incompatible changes. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) @@ -360,36 +338,33 @@ public void testAlterTableAddColumn() { .hasMessageContaining("Try to add a column `id` which already exists in the table."); } - @Test + @TestTemplate public void testAlterTableDropColumn() { sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get()), - Types.NestedField.optional(4, "col2", Types.LongType.get())) - .asStruct(), - schemaBefore.asStruct()); - + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct()); sql("ALTER TABLE tl DROP (dt)"); Schema schemaAfter1 = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get()), - Types.NestedField.optional(4, "col2", Types.LongType.get())) - .asStruct(), - schemaAfter1.asStruct()); - + assertThat(schemaAfter1.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct()); // Drop multiple columns sql("ALTER TABLE tl DROP (col1, col2)"); Schema schemaAfter2 = table("tl").schema(); - Assert.assertEquals( - new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), - schemaAfter2.asStruct()); - + assertThat(schemaAfter2.asStruct()) + .isEqualTo( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct()); // Dropping an non-existing field should fail due to Flink's internal validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) .isInstanceOf(ValidationException.class) @@ -401,48 +376,45 @@ public void testAlterTableDropColumn() { .hasMessageContaining("The column `dt` does not exist in the base table."); } - @Test + @TestTemplate public void testAlterTableModifyColumnName() { sql("CREATE TABLE tl(id BIGINT, dt STRING)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); - + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); sql("ALTER TABLE tl RENAME dt TO data"); Schema schemaAfter = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "data", Types.StringType.get())) - .asStruct(), - schemaAfter.asStruct()); + assertThat(schemaAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct()); } - @Test + @TestTemplate public void testAlterTableModifyColumnType() { sql("CREATE TABLE tl(id INTEGER, dt STRING)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); - + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); // Promote type from Integer to Long sql("ALTER TABLE tl MODIFY (id BIGINT)"); Schema schemaAfter = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaAfter.asStruct()); - + assertThat(schemaAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); // Type change that doesn't follow the type-promotion rule should fail due to Iceberg's // validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) @@ -451,17 +423,16 @@ public void testAlterTableModifyColumnType() { .hasRootCauseMessage("Cannot change column type: dt: string -> int"); } - @Test + @TestTemplate public void testAlterTableModifyColumnNullability() { sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); - + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); // Changing nullability from optional to required should fail // because Iceberg's SchemaUpdate does not allow incompatible changes. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) @@ -472,43 +443,42 @@ public void testAlterTableModifyColumnNullability() { // Set nullability from required to optional sql("ALTER TABLE tl MODIFY (id INTEGER)"); Schema schemaAfter = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaAfter.asStruct()); + assertThat(schemaAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); } - @Test + @TestTemplate public void testAlterTableModifyColumnPosition() { sql("CREATE TABLE tl(id BIGINT, dt STRING)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); Schema schemaAfter = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(2, "dt", Types.StringType.get()), - Types.NestedField.optional(1, "id", Types.LongType.get())) - .asStruct(), - schemaAfter.asStruct()); + assertThat(schemaAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct()); sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); Schema schemaAfterAfter = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaAfterAfter.asStruct()); - + assertThat(schemaAfterAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); // Modifying the position of a non-existing column should fail due to Flink's internal // validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (non_existing STRING FIRST)")) @@ -523,67 +493,64 @@ public void testAlterTableModifyColumnPosition() { "Referenced column `non_existing` by 'AFTER' does not exist in the table."); } - @Test + @TestTemplate public void testAlterTableModifyColumnComment() { sql("CREATE TABLE tl(id BIGINT, dt STRING)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct()); sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'comment for dt field')"); Schema schemaAfter = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get(), "comment for dt field")) - .asStruct(), - schemaAfter.asStruct()); + assertThat(schemaAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional( + 2, "dt", Types.StringType.get(), "comment for dt field")) + .asStruct()); } - @Test + @TestTemplate public void testAlterTableConstraint() { sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 STRING)"); Schema schemaBefore = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.required(2, "dt", Types.StringType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); - Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); - + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct()); + assertThat(schemaBefore.identifierFieldNames()).isEmpty(); sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)"); Schema schemaAfterAdd = table("tl").schema(); - Assert.assertEquals(ImmutableSet.of("id"), schemaAfterAdd.identifierFieldNames()); - + assertThat(schemaAfterAdd.identifierFieldNames()).containsExactly("id"); sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)"); Schema schemaAfterModify = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.required(2, "dt", Types.StringType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get())) - .asStruct(), - schemaAfterModify.asStruct()); - Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); - + assertThat(schemaAfterModify.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct()); + assertThat(schemaAfterModify.identifierFieldNames()).containsExactly("dt"); // Composite primary key sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)"); Schema schemaAfterComposite = table("tl").schema(); - Assert.assertEquals( - new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.required(2, "dt", Types.StringType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get())) - .asStruct(), - schemaAfterComposite.asStruct()); - Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); - + assertThat(schemaAfterComposite.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct()); + assertThat(schemaAfterComposite.identifierFieldNames()).containsExactlyInAnyOrder("id", "dt"); // Setting an optional field as primary key should fail // because Iceberg's SchemaUpdate does not allow incompatible changes. Assertions.assertThatThrownBy( @@ -607,16 +574,15 @@ public void testAlterTableConstraint() { .hasRootCauseMessage("Unsupported table change: DropConstraint."); } - @Test + @TestTemplate public void testRelocateTable() { - Assume.assumeFalse("HadoopCatalog does not support relocate table", isHadoopCatalog); - + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support relocate table").isFalse(); sql("CREATE TABLE tl(id BIGINT)"); sql("ALTER TABLE tl SET('location'='file:///tmp/location')"); - Assert.assertEquals("file:///tmp/location", table("tl").location()); + assertThat(table("tl").location()).isEqualTo("file:///tmp/location"); } - @Test + @TestTemplate public void testSetCurrentAndCherryPickSnapshotId() { sql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED BY (c1)"); @@ -651,9 +617,9 @@ public void testSetCurrentAndCherryPickSnapshotId() { table.newReplacePartitions().addFile(replacementFile).stageOnly().commit(); Snapshot staged = Iterables.getLast(table.snapshots()); - Assert.assertEquals( - "Should find the staged overwrite snapshot", DataOperations.OVERWRITE, staged.operation()); - + assertThat(staged.operation()) + .as("Should find the staged overwrite snapshot") + .isEqualTo(DataOperations.OVERWRITE); // add another append so that the original commit can't be fast-forwarded table.newAppend().appendFile(fileB).commit(); @@ -675,7 +641,7 @@ private void validateTableFiles(Table tbl, DataFile... expectedFiles) { .map(FileScanTask::file) .map(ContentFile::path) .collect(Collectors.toSet()); - Assert.assertEquals("Files should match", expectedFilePaths, actualFilePaths); + assertThat(actualFilePaths).as("Files should match").isEqualTo(expectedFilePaths); } private Table table(String name) { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java index 05fd1bad1ddb..b32be379caeb 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java @@ -53,7 +53,7 @@ protected static List parameters() { for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { for (Boolean cacheEnabled : new Boolean[] {true, false}) { - for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { + for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; parameters.add(new Object[] {catalogName, baseNamespace, format, cacheEnabled}); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java index 8f238587d30d..47ee2afceb02 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java @@ -42,7 +42,7 @@ public void testCreateCatalogWithWarehouseLocation() throws IOException { Map props = Maps.newHashMap(); props.put("type", "iceberg"); props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); - props.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf)); + props.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf)); File warehouseDir = tempFolder.newFolder(); props.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + warehouseDir.getAbsolutePath()); @@ -69,7 +69,7 @@ public void testCreateCatalogWithHiveConfDir() throws IOException { Map props = Maps.newHashMap(); props.put("type", "iceberg"); props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); - props.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf)); + props.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf)); // Set the 'hive-conf-dir' instead of 'warehouse' props.put(FlinkCatalogFactory.HIVE_CONF_DIR, hiveConfDir.getAbsolutePath()); @@ -78,9 +78,7 @@ public void testCreateCatalogWithHiveConfDir() throws IOException { private void checkSQLQuery(Map catalogProperties, File warehouseDir) throws IOException { - sql( - "CREATE CATALOG test_catalog WITH %s", - FlinkCatalogTestBase.toWithClause(catalogProperties)); + sql("CREATE CATALOG test_catalog WITH %s", CatalogTestBase.toWithClause(catalogProperties)); sql("USE CATALOG test_catalog"); sql("CREATE DATABASE test_db"); sql("USE test_db"); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 754062798928..b7fce104f490 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.flink; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,11 +35,12 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; @@ -45,42 +49,30 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestFlinkTableSink extends FlinkCatalogTestBase { - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); - - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; + +public class TestFlinkTableSink extends CatalogTestBase { private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source"; private static final String TABLE_NAME = "test_table"; private TableEnvironment tEnv; private Table icebergTable; - private final FileFormat format; - private final boolean isStreamingJob; + @Parameter(index = 2) + private FileFormat format; - @Parameterized.Parameters( - name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") - public static Iterable parameters() { + @Parameter(index = 3) + private boolean isStreamingJob; + + @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { for (Boolean isStreaming : new Boolean[] {true, false}) { - for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { + for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); @@ -90,13 +82,6 @@ public static Iterable parameters() { return parameters; } - public TestFlinkTableSink( - String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { - super(catalogName, baseNamespace); - this.format = format; - this.isStreamingJob = isStreamingJob; - } - @Override protected TableEnvironment getTableEnv() { if (tEnv == null) { @@ -121,7 +106,7 @@ protected TableEnvironment getTableEnv() { } @Override - @Before + @BeforeEach public void before() { super.before(); sql("CREATE DATABASE %s", flinkDatabase); @@ -134,7 +119,7 @@ public void before() { } @Override - @After + @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); @@ -142,7 +127,7 @@ public void clean() { super.clean(); } - @Test + @TestTemplate public void testInsertFromSourceTable() throws Exception { // Register the rows into a temporary table. getTableEnv() @@ -169,10 +154,11 @@ public void testInsertFromSourceTable() throws Exception { SimpleDataUtil.createRecord(null, "bar"))); } - @Test + @TestTemplate public void testOverwriteTable() throws Exception { - Assume.assumeFalse( - "Flink unbounded streaming does not support overwrite operation", isStreamingJob); + assumeThat(isStreamingJob) + .as("Flink unbounded streaming does not support overwrite operation") + .isFalse(); sql("INSERT INTO %s SELECT 1, 'a'", TABLE_NAME); SimpleDataUtil.assertTableRecords( @@ -183,7 +169,7 @@ public void testOverwriteTable() throws Exception { icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b"))); } - @Test + @TestTemplate public void testWriteParallelism() throws Exception { List dataSet = IntStream.range(1, 1000) @@ -206,22 +192,21 @@ public void testWriteParallelism() throws Exception { Transformation committer = dummySink.getInputs().get(0); Transformation writer = committer.getInputs().get(0); - Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism()); - + assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); writer .getInputs() .forEach( input -> - Assert.assertEquals( - "Should have the expected parallelism.", - isStreamingJob ? 2 : 4, - input.getParallelism())); + assertThat(input.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4)); } - @Test + @TestTemplate public void testReplacePartitions() throws Exception { - Assume.assumeFalse( - "Flink unbounded streaming does not support overwrite operation", isStreamingJob); + assumeThat(isStreamingJob) + .as("Flink unbounded streaming does not support overwrite operation") + .isFalse(); String tableName = "test_partition"; sql( "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", @@ -265,7 +250,7 @@ public void testReplacePartitions() throws Exception { } } - @Test + @TestTemplate public void testInsertIntoPartition() throws Exception { String tableName = "test_insert_into_partition"; sql( @@ -305,7 +290,7 @@ public void testInsertIntoPartition() throws Exception { } } - @Test + @TestTemplate public void testHashDistributeMode() throws Exception { String tableName = "test_hash_distribution_mode"; Map tableProps = @@ -326,10 +311,10 @@ public void testHashDistributeMode() throws Exception { "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId); - Assert.assertEquals( - "Should have the expected rows in source table.", - Sets.newHashSet(dataSet), - Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE))); + + assertThat(sql("SELECT * FROM %s", SOURCE_TABLE)) + .as("Should have the expected rows in source table.") + .containsExactlyInAnyOrderElementsOf(dataSet); sql( "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", @@ -339,10 +324,9 @@ public void testHashDistributeMode() throws Exception { // Insert data set. sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); - Assert.assertEquals( - "Should have the expected rows in sink table.", - Sets.newHashSet(dataSet), - Sets.newHashSet(sql("SELECT * FROM %s", tableName))); + assertThat(sql("SELECT * FROM %s", tableName)) + .as("Should have the expected rows in sink table.") + .containsExactlyInAnyOrderElementsOf(dataSet); // Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval, // thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per @@ -354,24 +338,18 @@ public void testHashDistributeMode() throws Exception { continue; } - Assert.assertEquals( - "There should be 1 data file in partition 'aaa'", - 1, - SimpleDataUtil.matchingPartitions( - dataFiles, table.spec(), ImmutableMap.of("data", "aaa")) - .size()); - Assert.assertEquals( - "There should be 1 data file in partition 'bbb'", - 1, - SimpleDataUtil.matchingPartitions( - dataFiles, table.spec(), ImmutableMap.of("data", "bbb")) - .size()); - Assert.assertEquals( - "There should be 1 data file in partition 'ccc'", - 1, - SimpleDataUtil.matchingPartitions( - dataFiles, table.spec(), ImmutableMap.of("data", "ccc")) - .size()); + assertThat( + SimpleDataUtil.matchingPartitions( + dataFiles, table.spec(), ImmutableMap.of("data", "aaa"))) + .hasSize(1); + assertThat( + SimpleDataUtil.matchingPartitions( + dataFiles, table.spec(), ImmutableMap.of("data", "bbb"))) + .hasSize(1); + assertThat( + SimpleDataUtil.matchingPartitions( + dataFiles, table.spec(), ImmutableMap.of("data", "ccc"))) + .hasSize(1); } } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index a25ebab6c4c2..5674c83e40b8 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -25,47 +25,32 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; -@RunWith(Parameterized.class) -public class TestFlinkUpsert extends FlinkCatalogTestBase { +public class TestFlinkUpsert extends CatalogTestBase { - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); + @Parameter(index = 2) + private FileFormat format; - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Parameter(index = 3) + private boolean isStreamingJob; - private final boolean isStreamingJob; private final Map tableUpsertProps = Maps.newHashMap(); private TableEnvironment tEnv; - public TestFlinkUpsert( - String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { - super(catalogName, baseNamespace); - this.isStreamingJob = isStreamingJob; - tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); - tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true"); - tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); - } - - @Parameterized.Parameters( - name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) { @@ -105,22 +90,25 @@ protected TableEnvironment getTableEnv() { } @Override - @Before + @BeforeEach public void before() { super.before(); sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); + tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); + tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true"); + tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); } @Override - @After + @AfterEach public void clean() { sql("DROP DATABASE IF EXISTS %s", flinkDatabase); super.clean(); } - @Test + @TestTemplate public void testUpsertAndQuery() { String tableName = "test_upsert_query"; LocalDate dt20220301 = LocalDate.of(2022, 3, 1); @@ -164,7 +152,7 @@ public void testUpsertAndQuery() { } } - @Test + @TestTemplate public void testUpsertOptions() { String tableName = "test_upsert_options"; LocalDate dt20220301 = LocalDate.of(2022, 3, 1); @@ -210,7 +198,7 @@ public void testUpsertOptions() { } } - @Test + @TestTemplate public void testPrimaryKeyEqualToPartitionKey() { // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey String tableName = "upsert_on_id_key"; @@ -243,7 +231,7 @@ public void testPrimaryKeyEqualToPartitionKey() { } } - @Test + @TestTemplate public void testPrimaryKeyFieldsAtBeginningOfSchema() { String tableName = "upsert_on_pk_at_schema_start"; LocalDate dt = LocalDate.of(2022, 3, 1); @@ -292,7 +280,7 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { } } - @Test + @TestTemplate public void testPrimaryKeyFieldsAtEndOfTableSchema() { // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key // fields diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java index 4f71b5fe8d7c..cb409b784383 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java @@ -280,7 +280,7 @@ public void testConnectorTableInIcebergCatalog() { catalogProps.put("type", "iceberg"); if (isHiveCatalog()) { catalogProps.put("catalog-type", "hive"); - catalogProps.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf)); + catalogProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf)); } else { catalogProps.put("catalog-type", "hadoop"); } @@ -315,7 +315,7 @@ private Map createTableProps() { tableProps.put("catalog-name", catalogName); tableProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse()); if (isHiveCatalog()) { - tableProps.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf)); + tableProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf)); } return tableProps; } @@ -337,7 +337,7 @@ private String databaseName() { } private String toWithClause(Map props) { - return FlinkCatalogTestBase.toWithClause(props); + return CatalogTestBase.toWithClause(props); } private static String createWarehouse() { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java index 07e5ca051da5..4220775f41fa 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -19,9 +19,11 @@ package org.apache.iceberg.flink.actions; import static org.apache.iceberg.flink.SimpleDataUtil.RECORD; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.List; import java.util.Set; @@ -39,6 +41,8 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.actions.RewriteDataFilesActionResult; @@ -49,7 +53,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; @@ -59,44 +63,36 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestRewriteDataFilesAction extends FlinkCatalogTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; + +public class TestRewriteDataFilesAction extends CatalogTestBase { private static final String TABLE_NAME_UNPARTITIONED = "test_table_unpartitioned"; private static final String TABLE_NAME_PARTITIONED = "test_table_partitioned"; private static final String TABLE_NAME_WITH_PK = "test_table_with_pk"; - private final FileFormat format; + + @Parameter(index = 2) + private FileFormat format; + private Table icebergTableUnPartitioned; private Table icebergTablePartitioned; private Table icebergTableWithPk; - public TestRewriteDataFilesAction( - String catalogName, Namespace baseNamespace, FileFormat format) { - super(catalogName, baseNamespace); - this.format = format; - } - @Override protected TableEnvironment getTableEnv() { super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); return super.getTableEnv(); } - @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}") + public static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) { - for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { + for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; parameters.add(new Object[] {catalogName, baseNamespace, format}); @@ -105,10 +101,10 @@ public static Iterable parameters() { return parameters; } - @Rule public TemporaryFolder temp = new TemporaryFolder(); + private @TempDir Path temp; @Override - @Before + @BeforeEach public void before() { super.before(); sql("CREATE DATABASE %s", flinkDatabase); @@ -135,7 +131,7 @@ public void before() { } @Override - @After + @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED); sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED); @@ -144,14 +140,14 @@ public void clean() { super.clean(); } - @Test + @TestTemplate public void testRewriteDataFilesEmptyTable() throws Exception { - Assert.assertNull("Table must be empty", icebergTableUnPartitioned.currentSnapshot()); + assertThat(icebergTableUnPartitioned.currentSnapshot()).isNull(); Actions.forTable(icebergTableUnPartitioned).rewriteDataFiles().execute(); - Assert.assertNull("Table must stay empty", icebergTableUnPartitioned.currentSnapshot()); + assertThat(icebergTableUnPartitioned.currentSnapshot()).isNull(); } - @Test + @TestTemplate public void testRewriteDataFilesUnpartitionedTable() throws Exception { sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED); sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED); @@ -161,21 +157,19 @@ public void testRewriteDataFilesUnpartitionedTable() throws Exception { CloseableIterable tasks = icebergTableUnPartitioned.newScan().planFiles(); List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); - Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size()); - + assertThat(dataFiles).hasSize(2); RewriteDataFilesActionResult result = Actions.forTable(icebergTableUnPartitioned).rewriteDataFiles().execute(); - Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); - Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); + assertThat(result.deletedDataFiles()).hasSize(2); + assertThat(result.addedDataFiles()).hasSize(1); icebergTableUnPartitioned.refresh(); CloseableIterable tasks1 = icebergTableUnPartitioned.newScan().planFiles(); List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); - Assert.assertEquals("Should have 1 data files after rewrite", 1, dataFiles1.size()); - + assertThat(dataFiles1).hasSize(1); // Assert the table records as expected. SimpleDataUtil.assertTableRecords( icebergTableUnPartitioned, @@ -183,7 +177,7 @@ public void testRewriteDataFilesUnpartitionedTable() throws Exception { SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world"))); } - @Test + @TestTemplate public void testRewriteDataFilesPartitionedTable() throws Exception { sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED); sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED); @@ -195,21 +189,19 @@ public void testRewriteDataFilesPartitionedTable() throws Exception { CloseableIterable tasks = icebergTablePartitioned.newScan().planFiles(); List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); - Assert.assertEquals("Should have 4 data files before rewrite", 4, dataFiles.size()); - + assertThat(dataFiles).hasSize(4); RewriteDataFilesActionResult result = Actions.forTable(icebergTablePartitioned).rewriteDataFiles().execute(); - Assert.assertEquals("Action should rewrite 4 data files", 4, result.deletedDataFiles().size()); - Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size()); + assertThat(result.deletedDataFiles()).hasSize(4); + assertThat(result.addedDataFiles()).hasSize(2); icebergTablePartitioned.refresh(); CloseableIterable tasks1 = icebergTablePartitioned.newScan().planFiles(); List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); - Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles1.size()); - + assertThat(dataFiles1).hasSize(2); // Assert the table records as expected. Schema schema = new Schema( @@ -227,7 +219,7 @@ public void testRewriteDataFilesPartitionedTable() throws Exception { record.copy("id", 4, "data", "world", "spec", "b"))); } - @Test + @TestTemplate public void testRewriteDataFilesWithFilter() throws Exception { sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED); sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED); @@ -240,25 +232,22 @@ public void testRewriteDataFilesWithFilter() throws Exception { CloseableIterable tasks = icebergTablePartitioned.newScan().planFiles(); List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); - Assert.assertEquals("Should have 5 data files before rewrite", 5, dataFiles.size()); - + assertThat(dataFiles).hasSize(5); RewriteDataFilesActionResult result = Actions.forTable(icebergTablePartitioned) .rewriteDataFiles() .filter(Expressions.equal("spec", "a")) .filter(Expressions.startsWith("data", "he")) .execute(); - - Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); - Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); + assertThat(result.deletedDataFiles()).hasSize(2); + assertThat(result.addedDataFiles()).hasSize(1); icebergTablePartitioned.refresh(); CloseableIterable tasks1 = icebergTablePartitioned.newScan().planFiles(); List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); - Assert.assertEquals("Should have 4 data files after rewrite", 4, dataFiles1.size()); - + assertThat(dataFiles1).hasSize(4); // Assert the table records as expected. Schema schema = new Schema( @@ -277,7 +266,7 @@ public void testRewriteDataFilesWithFilter() throws Exception { record.copy("id", 5, "data", "world", "spec", "b"))); } - @Test + @TestTemplate public void testRewriteLargeTableHasResiduals() throws IOException { // all records belong to the same partition List records1 = Lists.newArrayList(); @@ -309,19 +298,19 @@ public void testRewriteLargeTableHasResiduals() throws IOException { .filter(Expressions.equal("data", "0")) .planFiles(); for (FileScanTask task : tasks) { - Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual()); + assertThat(task.residual()) + .as("Residuals must be ignored") + .isEqualTo(Expressions.alwaysTrue()); } List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); - Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size()); - + assertThat(dataFiles).hasSize(2); Actions actions = Actions.forTable(icebergTableUnPartitioned); RewriteDataFilesActionResult result = actions.rewriteDataFiles().filter(Expressions.equal("data", "0")).execute(); - Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); - Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); - + assertThat(result.deletedDataFiles()).hasSize(2); + assertThat(result.addedDataFiles()).hasSize(1); // Assert the table records as expected. SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected); } @@ -339,12 +328,12 @@ public void testRewriteLargeTableHasResiduals() throws IOException { * * @throws IOException IOException */ - @Test + @TestTemplate public void testRewriteAvoidRepeateCompress() throws IOException { List expected = Lists.newArrayList(); Schema schema = icebergTableUnPartitioned.schema(); GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema); - File file = temp.newFile(); + File file = File.createTempFile("junit", null, temp.toFile()); int count = 0; try (FileAppender fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) { @@ -374,8 +363,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException { CloseableIterable tasks = icebergTableUnPartitioned.newScan().planFiles(); List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); - Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size()); - + assertThat(dataFiles).hasSize(3); Actions actions = Actions.forTable(icebergTableUnPartitioned); long targetSizeInBytes = file.length() + 10; @@ -385,20 +373,18 @@ public void testRewriteAvoidRepeateCompress() throws IOException { .targetSizeInBytes(targetSizeInBytes) .splitOpenFileCost(1) .execute(); - Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); - Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); - + assertThat(result.deletedDataFiles()).hasSize(2); + assertThat(result.addedDataFiles()).hasSize(1); icebergTableUnPartitioned.refresh(); CloseableIterable tasks1 = icebergTableUnPartitioned.newScan().planFiles(); List dataFilesRewrote = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); - Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFilesRewrote.size()); - + assertThat(dataFilesRewrote).hasSize(2); // the biggest file do not be rewrote List rewroteDataFileNames = dataFilesRewrote.stream().map(ContentFile::path).collect(Collectors.toList()); - Assert.assertTrue(rewroteDataFileNames.contains(file.getAbsolutePath())); + assertThat(rewroteDataFileNames).contains(file.getAbsolutePath()); // Assert the table records as expected. expected.add(SimpleDataUtil.createRecord(1, "a")); @@ -406,7 +392,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException { SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected); } - @Test + @TestTemplate public void testRewriteNoConflictWithEqualityDeletes() throws IOException { // Add 2 data files sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK); @@ -423,11 +409,9 @@ public void testRewriteNoConflictWithEqualityDeletes() throws IOException { sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK); icebergTableWithPk.refresh(); - Assert.assertEquals( - "The latest sequence number should be greater than that of the stale snapshot", - stale1.currentSnapshot().sequenceNumber() + 1, - icebergTableWithPk.currentSnapshot().sequenceNumber()); - + assertThat(icebergTableWithPk.currentSnapshot().sequenceNumber()) + .as("The latest sequence number should be greater than that of the stale snapshot") + .isEqualTo(stale1.currentSnapshot().sequenceNumber() + 1); CloseableIterable tasks = icebergTableWithPk.newScan().planFiles(); List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); @@ -435,12 +419,10 @@ public void testRewriteNoConflictWithEqualityDeletes() throws IOException { Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream() .flatMap(Collection::stream) .collect(Collectors.toSet()); - Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size()); - Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size()); - Assert.assertSame( - "The 1 delete file should be an equality-delete file", - Iterables.getOnlyElement(deleteFiles).content(), - FileContent.EQUALITY_DELETES); + assertThat(dataFiles).hasSize(3); + assertThat(deleteFiles).hasSize(1); + assertThat(Iterables.getOnlyElement(deleteFiles).content()) + .isEqualTo(FileContent.EQUALITY_DELETES); shouldHaveDataAndFileSequenceNumbers( TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L))); @@ -459,8 +441,8 @@ public void testRewriteNoConflictWithEqualityDeletes() throws IOException { Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute(); // Should not rewrite files from the new commit - Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); - Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); + assertThat(result.deletedDataFiles()).hasSize(2); + assertThat(result.addedDataFiles()).hasSize(1); // The 2 older files with file-sequence-number <= 2 should be rewritten into a new file. // The new file is the one with file-sequence-number == 4. // The new file should use rewrite's starting-sequence-number 2 as its data-sequence-number. @@ -494,6 +476,6 @@ private void shouldHaveDataAndFileSequenceNumbers( Pair.of( row.getFieldAs("sequence_number"), row.getFieldAs("file_sequence_number"))) .collect(Collectors.toList()); - Assertions.assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers); + assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index 5ecf4f4536bb..f58cc87c6a29 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -18,7 +18,12 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.time.Instant; import java.util.Comparator; import java.util.Iterator; @@ -41,6 +46,8 @@ import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.MetricsUtil; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -52,7 +59,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -60,29 +67,21 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestFlinkMetaDataTable extends FlinkCatalogTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; + +public class TestFlinkMetaDataTable extends CatalogTestBase { private static final String TABLE_NAME = "test_table"; private final FileFormat format = FileFormat.AVRO; - private static final TemporaryFolder TEMP = new TemporaryFolder(); - private final boolean isPartition; + private @TempDir Path temp; - public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace, Boolean isPartition) { - super(catalogName, baseNamespace); - this.isPartition = isPartition; - } + @Parameter(index = 2) + private Boolean isPartition; - @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, isPartition={2}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}, isPartition={2}") + protected static List parameters() { List parameters = Lists.newArrayList(); for (Boolean isPartition : new Boolean[] {true, false}) { @@ -100,7 +99,7 @@ protected TableEnvironment getTableEnv() { return super.getTableEnv(); } - @Before + @BeforeEach public void before() { super.before(); sql("USE CATALOG %s", catalogName); @@ -124,14 +123,14 @@ public void before() { } @Override - @After + @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); super.clean(); } - @Test + @TestTemplate public void testSnapshots() { String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME); List result = sql(sql); @@ -140,22 +139,22 @@ public void testSnapshots() { Iterator snapshots = table.snapshots().iterator(); for (Row row : result) { Snapshot next = snapshots.next(); - Assert.assertEquals( - "Should have expected timestamp", - ((Instant) row.getField(0)).toEpochMilli(), - next.timestampMillis()); - Assert.assertEquals("Should have expected snapshot id", next.snapshotId(), row.getField(1)); - Assert.assertEquals("Should have expected parent id", next.parentId(), row.getField(2)); - Assert.assertEquals("Should have expected operation", next.operation(), row.getField(3)); - Assert.assertEquals( - "Should have expected manifest list location", - row.getField(4), - next.manifestListLocation()); - Assert.assertEquals("Should have expected summary", next.summary(), row.getField(5)); + assertThat(((Instant) row.getField(0)).toEpochMilli()) + .as("Should have expected timestamp") + .isEqualTo(next.timestampMillis()); + assertThat(next.snapshotId()) + .as("Should have expected snapshot id") + .isEqualTo(next.snapshotId()); + assertThat(row.getField(2)).as("Should have expected parent id").isEqualTo(next.parentId()); + assertThat(row.getField(3)).as("Should have expected operation").isEqualTo(next.operation()); + assertThat(row.getField(4)) + .as("Should have expected manifest list location") + .isEqualTo(next.manifestListLocation()); + assertThat(row.getField(5)).as("Should have expected summary").isEqualTo(next.summary()); } } - @Test + @TestTemplate public void testHistory() { String sql = String.format("SELECT * FROM %s$history ", TABLE_NAME); List result = sql(sql); @@ -164,21 +163,22 @@ public void testHistory() { Iterator snapshots = table.snapshots().iterator(); for (Row row : result) { Snapshot next = snapshots.next(); - Assert.assertEquals( - "Should have expected made_current_at", - ((Instant) row.getField(0)).toEpochMilli(), - next.timestampMillis()); - Assert.assertEquals("Should have expected snapshot id", next.snapshotId(), row.getField(1)); - Assert.assertEquals("Should have expected parent id", next.parentId(), row.getField(2)); - - Assert.assertEquals( - "Should have expected is current ancestor", - SnapshotUtil.isAncestorOf(table, table.currentSnapshot().snapshotId(), next.snapshotId()), - row.getField(3)); + assertThat(((Instant) row.getField(0)).toEpochMilli()) + .as("Should have expected made_current_at") + .isEqualTo(next.timestampMillis()); + assertThat(row.getField(1)) + .as("Should have expected snapshot id") + .isEqualTo(next.snapshotId()); + assertThat(row.getField(2)).as("Should have expected parent id").isEqualTo(next.parentId()); + assertThat(row.getField(3)) + .as("Should have expected is current ancestor") + .isEqualTo( + SnapshotUtil.isAncestorOf( + table, table.currentSnapshot().snapshotId(), next.snapshotId())); } } - @Test + @TestTemplate public void testManifests() { String sql = String.format("SELECT * FROM %s$manifests ", TABLE_NAME); List result = sql(sql); @@ -189,32 +189,32 @@ public void testManifests() { for (int i = 0; i < result.size(); i++) { Row row = result.get(i); ManifestFile manifestFile = expectedDataManifests.get(i); - Assert.assertEquals( - "Should have expected content", manifestFile.content().id(), row.getField(0)); - Assert.assertEquals("Should have expected path", manifestFile.path(), row.getField(1)); - Assert.assertEquals("Should have expected length", manifestFile.length(), row.getField(2)); - Assert.assertEquals( - "Should have expected partition_spec_id", - manifestFile.partitionSpecId(), - row.getField(3)); - Assert.assertEquals( - "Should have expected added_snapshot_id", manifestFile.snapshotId(), row.getField(4)); - Assert.assertEquals( - "Should have expected added_data_files_count", - manifestFile.addedFilesCount(), - row.getField(5)); - Assert.assertEquals( - "Should have expected existing_data_files_count", - manifestFile.existingFilesCount(), - row.getField(6)); - Assert.assertEquals( - "Should have expected deleted_data_files_count", - manifestFile.deletedFilesCount(), - row.getField(7)); + assertThat(row.getField(0)) + .as("Should have expected content") + .isEqualTo(manifestFile.content().id()); + assertThat(row.getField(1)).as("Should have expected path").isEqualTo(manifestFile.path()); + assertThat(row.getField(2)) + .as("Should have expected length") + .isEqualTo(manifestFile.length()); + assertThat(row.getField(3)) + .as("Should have expected partition_spec_id") + .isEqualTo(manifestFile.partitionSpecId()); + assertThat(row.getField(4)) + .as("Should have expected added_snapshot_id") + .isEqualTo(manifestFile.snapshotId()); + assertThat(row.getField(5)) + .as("Should have expected added_data_files_count") + .isEqualTo(manifestFile.addedFilesCount()); + assertThat(row.getField(6)) + .as("Should have expected existing_data_files_count") + .isEqualTo(manifestFile.existingFilesCount()); + assertThat(row.getField(7)) + .as("Should have expected deleted_data_files_count") + .isEqualTo(manifestFile.deletedFilesCount()); } } - @Test + @TestTemplate public void testAllManifests() { Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); @@ -223,55 +223,54 @@ public void testAllManifests() { List expectedDataManifests = allDataManifests(table); - Assert.assertEquals(expectedDataManifests.size(), result.size()); + assertThat(expectedDataManifests).hasSize(result.size()); for (int i = 0; i < result.size(); i++) { Row row = result.get(i); ManifestFile manifestFile = expectedDataManifests.get(i); - Assert.assertEquals( - "Should have expected content", manifestFile.content().id(), row.getField(0)); - Assert.assertEquals("Should have expected path", manifestFile.path(), row.getField(1)); - Assert.assertEquals("Should have expected length", manifestFile.length(), row.getField(2)); - Assert.assertEquals( - "Should have expected partition_spec_id", - manifestFile.partitionSpecId(), - row.getField(3)); - Assert.assertEquals( - "Should have expected added_snapshot_id", manifestFile.snapshotId(), row.getField(4)); - Assert.assertEquals( - "Should have expected added_data_files_count", - manifestFile.addedFilesCount(), - row.getField(5)); - Assert.assertEquals( - "Should have expected existing_data_files_count", - manifestFile.existingFilesCount(), - row.getField(6)); - Assert.assertEquals( - "Should have expected deleted_data_files_count", - manifestFile.deletedFilesCount(), - row.getField(7)); + assertThat(row.getField(0)) + .as("Should have expected content") + .isEqualTo(manifestFile.content().id()); + assertThat(row.getField(1)).as("Should have expected path").isEqualTo(manifestFile.path()); + assertThat(row.getField(2)) + .as("Should have expected length") + .isEqualTo(manifestFile.length()); + assertThat(row.getField(3)) + .as("Should have expected partition_spec_id") + .isEqualTo(manifestFile.partitionSpecId()); + assertThat(row.getField(4)) + .as("Should have expected added_snapshot_id") + .isEqualTo(manifestFile.snapshotId()); + assertThat(row.getField(5)) + .as("Should have expected added_data_files_count") + .isEqualTo(manifestFile.addedFilesCount()); + assertThat(row.getField(6)) + .as("Should have expected existing_data_files_count") + .isEqualTo(manifestFile.existingFilesCount()); + assertThat(row.getField(7)) + .as("Should have expected deleted_data_files_count") + .isEqualTo(manifestFile.deletedFilesCount()); } } - @Test + @TestTemplate public void testUnPartitionedTable() throws IOException { - Assume.assumeFalse(isPartition); + assumeThat(isPartition).isFalse(); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); Schema deleteRowSchema = table.schema().select("id"); Record dataDelete = GenericRecord.create(deleteRowSchema); List dataDeletes = Lists.newArrayList(dataDelete.copy("id", 1)); - - TEMP.create(); + File testFile = File.createTempFile("junit", null, temp.toFile()); DeleteFile eqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(TEMP.newFile()), dataDeletes, deleteRowSchema); + table, Files.localOutput(testFile), dataDeletes, deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); List expectedDataManifests = dataManifests(table); List expectedDeleteManifests = deleteManifests(table); - Assert.assertEquals("Should have 2 data manifest", 2, expectedDataManifests.size()); - Assert.assertEquals("Should have 1 delete manifest", 1, expectedDeleteManifests.size()); + assertThat(expectedDataManifests).hasSize(2); + assertThat(expectedDeleteManifests).hasSize(1); Schema entriesTableSchema = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.from("entries")) @@ -294,12 +293,13 @@ public void testUnPartitionedTable() throws IOException { deleteFilesTableSchema = deleteFilesTableSchema.select(deleteColumns); List actualDeleteFiles = sql("SELECT %s FROM %s$delete_files", deleteNames, TABLE_NAME); - Assert.assertEquals("Metadata table should return 1 delete file", 1, actualDeleteFiles.size()); + assertThat(actualDeleteFiles).hasSize(1); + assertThat(expectedDeleteManifests).as("Should have 1 delete manifest").hasSize(1); List expectedDeleteFiles = expectedEntries( table, FileContent.EQUALITY_DELETES, entriesTableSchema, expectedDeleteManifests, null); - Assert.assertEquals("Should be 1 delete file manifest entry", 1, expectedDeleteFiles.size()); + assertThat(expectedDeleteFiles).as("Should be 1 delete file manifest entry").hasSize(1); TestHelpers.assertEquals( deleteFilesTableSchema, expectedDeleteFiles.get(0), actualDeleteFiles.get(0)); @@ -318,51 +318,50 @@ public void testUnPartitionedTable() throws IOException { filesTableSchema = filesTableSchema.select(columns); List actualDataFiles = sql("SELECT %s FROM %s$data_files", names, TABLE_NAME); - Assert.assertEquals("Metadata table should return 2 data file", 2, actualDataFiles.size()); - + assertThat(actualDataFiles).as("Metadata table should return 2 data file").hasSize(2); List expectedDataFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); - Assert.assertEquals("Should be 2 data file manifest entry", 2, expectedDataFiles.size()); + assertThat(expectedDataFiles).as("Should be 2 data file manifest entry").hasSize(2); TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0), actualDataFiles.get(0)); // check all files table List actualFiles = sql("SELECT %s FROM %s$files ORDER BY content", names, TABLE_NAME); - Assert.assertEquals("Metadata table should return 3 files", 3, actualFiles.size()); - + assertThat(actualFiles).as("Metadata table should return 3 files").hasSize(3); List expectedFiles = Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream()) .collect(Collectors.toList()); - Assert.assertEquals("Should have 3 files manifest entries", 3, expectedFiles.size()); + assertThat(expectedFiles).as("Should have 3 files manifest entriess").hasSize(3); TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(0), actualFiles.get(0)); TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(1), actualFiles.get(1)); } - @Test + @TestTemplate public void testPartitionedTable() throws Exception { - Assume.assumeFalse(!isPartition); + assumeThat(isPartition).isTrue(); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); Schema deleteRowSchema = table.schema().select("id", "data"); Record dataDelete = GenericRecord.create(deleteRowSchema); - TEMP.create(); Map deleteRow = Maps.newHashMap(); deleteRow.put("id", 1); deleteRow.put("data", "a"); + File testFile = File.createTempFile("junit", null, temp.toFile()); DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(TEMP.newFile()), + Files.localOutput(testFile), org.apache.iceberg.TestHelpers.Row.of("a"), Lists.newArrayList(dataDelete.copy(deleteRow)), deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); deleteRow.put("data", "b"); + File testFile2 = File.createTempFile("junit", null, temp.toFile()); DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( table, - Files.localOutput(TEMP.newFile()), + Files.localOutput(testFile2), org.apache.iceberg.TestHelpers.Row.of("b"), Lists.newArrayList(dataDelete.copy(deleteRow)), deleteRowSchema); @@ -375,9 +374,8 @@ public void testPartitionedTable() throws Exception { List expectedDataManifests = dataManifests(table); List expectedDeleteManifests = deleteManifests(table); - Assert.assertEquals("Should have 2 data manifests", 2, expectedDataManifests.size()); - Assert.assertEquals("Should have 2 delete manifests", 2, expectedDeleteManifests.size()); - + assertThat(expectedDataManifests).hasSize(2); + assertThat(expectedDeleteManifests).hasSize(2); Table deleteFilesTable = MetadataTableUtils.createMetadataTableInstance( table, MetadataTableType.from("delete_files")); @@ -396,75 +394,67 @@ public void testPartitionedTable() throws Exception { List expectedDeleteFiles = expectedEntries( table, FileContent.EQUALITY_DELETES, entriesTableSchema, expectedDeleteManifests, "a"); - Assert.assertEquals( - "Should have one delete file manifest entry", 1, expectedDeleteFiles.size()); - + assertThat(expectedDeleteFiles).hasSize(1); List actualDeleteFiles = sql("SELECT %s FROM %s$delete_files WHERE `partition`.`data`='a'", names, TABLE_NAME); - Assert.assertEquals( - "Metadata table should return one delete file", 1, actualDeleteFiles.size()); + assertThat(actualDeleteFiles).hasSize(1); TestHelpers.assertEquals( filesTableSchema, expectedDeleteFiles.get(0), actualDeleteFiles.get(0)); // Check data files table List expectedDataFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, "a"); - Assert.assertEquals("Should have one data file manifest entry", 1, expectedDataFiles.size()); - + assertThat(expectedDataFiles).hasSize(1); List actualDataFiles = sql("SELECT %s FROM %s$data_files WHERE `partition`.`data`='a'", names, TABLE_NAME); - Assert.assertEquals("Metadata table should return one data file", 1, actualDataFiles.size()); + assertThat(actualDataFiles).hasSize(1); TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0), actualDataFiles.get(0)); List actualPartitionsWithProjection = sql("SELECT file_count FROM %s$partitions ", TABLE_NAME); - Assert.assertEquals( - "Metadata table should return two partitions record", - 2, - actualPartitionsWithProjection.size()); + assertThat(actualPartitionsWithProjection).hasSize(2); for (int i = 0; i < 2; ++i) { - Assert.assertEquals(1, actualPartitionsWithProjection.get(i).getField(0)); + assertThat(actualPartitionsWithProjection.get(i).getField(0)).isEqualTo(1); } // Check files table List expectedFiles = Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream()) .collect(Collectors.toList()); - Assert.assertEquals("Should have two file manifest entries", 2, expectedFiles.size()); - + assertThat(expectedFiles).hasSize(2); List actualFiles = sql( "SELECT %s FROM %s$files WHERE `partition`.`data`='a' ORDER BY content", names, TABLE_NAME); - Assert.assertEquals("Metadata table should return two files", 2, actualFiles.size()); + assertThat(actualFiles).hasSize(2); TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(0), actualFiles.get(0)); TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(1), actualFiles.get(1)); } - @Test + @TestTemplate public void testAllFilesUnpartitioned() throws Exception { - Assume.assumeFalse(isPartition); + assumeThat(isPartition).isFalse(); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); Schema deleteRowSchema = table.schema().select("id", "data"); Record dataDelete = GenericRecord.create(deleteRowSchema); - TEMP.create(); Map deleteRow = Maps.newHashMap(); deleteRow.put("id", 1); + File testFile = File.createTempFile("junit", null, temp.toFile()); DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(TEMP.newFile()), + Files.localOutput(testFile), Lists.newArrayList(dataDelete.copy(deleteRow)), deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); List expectedDataManifests = dataManifests(table); - Assert.assertEquals("Should have 2 data manifest", 2, expectedDataManifests.size()); + assertThat(expectedDataManifests).hasSize(2); List expectedDeleteManifests = deleteManifests(table); - Assert.assertEquals("Should have 1 delete manifest", 1, expectedDeleteManifests.size()); + assertThat(expectedDeleteManifests).hasSize(1); // Clear table to test whether 'all_files' can read past files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); @@ -492,8 +482,8 @@ public void testAllFilesUnpartitioned() throws Exception { List expectedDataFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); - Assert.assertEquals("Should be 2 data file manifest entry", 2, expectedDataFiles.size()); - Assert.assertEquals("Metadata table should return 2 data file", 2, actualDataFiles.size()); + assertThat(expectedDataFiles).hasSize(2); + assertThat(actualDataFiles).hasSize(2); TestHelpers.assertEquals(filesTableSchema, expectedDataFiles, actualDataFiles); // Check all delete files table @@ -501,9 +491,8 @@ public void testAllFilesUnpartitioned() throws Exception { List expectedDeleteFiles = expectedEntries( table, FileContent.EQUALITY_DELETES, entriesTableSchema, expectedDeleteManifests, null); - Assert.assertEquals("Should be one delete file manifest entry", 1, expectedDeleteFiles.size()); - Assert.assertEquals( - "Metadata table should return one delete file", 1, actualDeleteFiles.size()); + assertThat(expectedDeleteFiles).hasSize(1); + assertThat(actualDeleteFiles).hasSize(1); TestHelpers.assertEquals( filesTableSchema, expectedDeleteFiles.get(0), actualDeleteFiles.get(0)); @@ -513,43 +502,43 @@ public void testAllFilesUnpartitioned() throws Exception { List expectedFiles = ListUtils.union(expectedDataFiles, expectedDeleteFiles); expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); - Assert.assertEquals("Metadata table should return 3 files", 3, actualFiles.size()); + assertThat(actualFiles).hasSize(3); TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles); } - @Test + @TestTemplate public void testAllFilesPartitioned() throws Exception { - Assume.assumeFalse(!isPartition); + assumeThat(!isPartition).isFalse(); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); // Create delete file Schema deleteRowSchema = table.schema().select("id"); Record dataDelete = GenericRecord.create(deleteRowSchema); - TEMP.create(); Map deleteRow = Maps.newHashMap(); deleteRow.put("id", 1); + File testFile = File.createTempFile("junit", null, temp.toFile()); DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(TEMP.newFile()), + Files.localOutput(testFile), org.apache.iceberg.TestHelpers.Row.of("a"), Lists.newArrayList(dataDelete.copy(deleteRow)), deleteRowSchema); + File testFile2 = File.createTempFile("junit", null, temp.toFile()); DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( table, - Files.localOutput(TEMP.newFile()), + Files.localOutput(testFile2), org.apache.iceberg.TestHelpers.Row.of("b"), Lists.newArrayList(dataDelete.copy(deleteRow)), deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).addDeletes(eqDeletes2).commit(); List expectedDataManifests = dataManifests(table); - Assert.assertEquals("Should have 2 data manifests", 2, expectedDataManifests.size()); + assertThat(expectedDataManifests).hasSize(2); List expectedDeleteManifests = deleteManifests(table); - Assert.assertEquals("Should have 1 delete manifest", 1, expectedDeleteManifests.size()); - + assertThat(expectedDeleteManifests).hasSize(1); // Clear table to test whether 'all_files' can read past files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); @@ -575,8 +564,8 @@ public void testAllFilesPartitioned() throws Exception { sql("SELECT %s FROM %s$all_data_files WHERE `partition`.`data`='a'", names, TABLE_NAME); List expectedDataFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, "a"); - Assert.assertEquals("Should be one data file manifest entry", 1, expectedDataFiles.size()); - Assert.assertEquals("Metadata table should return one data file", 1, actualDataFiles.size()); + assertThat(expectedDataFiles).hasSize(1); + assertThat(actualDataFiles).hasSize(1); TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0), actualDataFiles.get(0)); // Check all delete files table @@ -585,9 +574,8 @@ public void testAllFilesPartitioned() throws Exception { List expectedDeleteFiles = expectedEntries( table, FileContent.EQUALITY_DELETES, entriesTableSchema, expectedDeleteManifests, "a"); - Assert.assertEquals("Should be one data file manifest entry", 1, expectedDeleteFiles.size()); - Assert.assertEquals("Metadata table should return one data file", 1, actualDeleteFiles.size()); - + assertThat(expectedDeleteFiles).hasSize(1); + assertThat(actualDeleteFiles).hasSize(1); TestHelpers.assertEquals( filesTableSchema, expectedDeleteFiles.get(0), actualDeleteFiles.get(0)); @@ -599,11 +587,11 @@ public void testAllFilesPartitioned() throws Exception { List expectedFiles = ListUtils.union(expectedDataFiles, expectedDeleteFiles); expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); - Assert.assertEquals("Metadata table should return two files", 2, actualFiles.size()); + assertThat(actualFiles).hasSize(2); TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles); } - @Test + @TestTemplate public void testMetadataLogEntries() { Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); @@ -617,55 +605,51 @@ public void testMetadataLogEntries() { // Check metadataLog table List metadataLogs = sql("SELECT * FROM %s$metadata_log_entries", TABLE_NAME); - Assert.assertEquals("metadataLogEntries table should return 3 row", 3, metadataLogs.size()); + assertThat(metadataLogs).hasSize(3); Row metadataLog = metadataLogs.get(0); - Assert.assertEquals( - Instant.ofEpochMilli(metadataLogEntries.get(0).timestampMillis()), - metadataLog.getField("timestamp")); - Assert.assertEquals(metadataLogEntries.get(0).file(), metadataLog.getField("file")); - Assert.assertNull(metadataLog.getField("latest_snapshot_id")); - Assert.assertNull(metadataLog.getField("latest_schema_id")); - Assert.assertNull(metadataLog.getField("latest_sequence_number")); + assertThat(metadataLog.getField("timestamp")) + .isEqualTo(Instant.ofEpochMilli(metadataLogEntries.get(0).timestampMillis())); + assertThat(metadataLog.getField("file")).isEqualTo(metadataLogEntries.get(0).file()); + assertThat(metadataLog.getField("latest_snapshot_id")).isNull(); + assertThat(metadataLog.getField("latest_schema_id")).isNull(); + assertThat(metadataLog.getField("latest_sequence_number")).isNull(); metadataLog = metadataLogs.get(1); - Assert.assertEquals( - Instant.ofEpochMilli(metadataLogEntries.get(1).timestampMillis()), - metadataLog.getField("timestamp")); - Assert.assertEquals(metadataLogEntries.get(1).file(), metadataLog.getField("file")); - Assert.assertEquals(parentSnapshot.snapshotId(), metadataLog.getField("latest_snapshot_id")); - Assert.assertEquals(parentSnapshot.schemaId(), metadataLog.getField("latest_schema_id")); - Assert.assertEquals( - parentSnapshot.sequenceNumber(), metadataLog.getField("latest_sequence_number")); + assertThat(metadataLog.getField("timestamp")) + .isEqualTo(Instant.ofEpochMilli(metadataLogEntries.get(1).timestampMillis())); + assertThat(metadataLog.getField("file")).isEqualTo(metadataLogEntries.get(1).file()); + assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(parentSnapshot.snapshotId()); + assertThat(metadataLog.getField("latest_schema_id")).isEqualTo(parentSnapshot.schemaId()); + assertThat(metadataLog.getField("latest_sequence_number")) + .isEqualTo(parentSnapshot.sequenceNumber()); + assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(parentSnapshot.snapshotId()); metadataLog = metadataLogs.get(2); - Assert.assertEquals( - Instant.ofEpochMilli(currentSnapshot.timestampMillis()), metadataLog.getField("timestamp")); - Assert.assertEquals(tableMetadata.metadataFileLocation(), metadataLog.getField("file")); - Assert.assertEquals(currentSnapshot.snapshotId(), metadataLog.getField("latest_snapshot_id")); - Assert.assertEquals(currentSnapshot.schemaId(), metadataLog.getField("latest_schema_id")); - Assert.assertEquals( - currentSnapshot.sequenceNumber(), metadataLog.getField("latest_sequence_number")); + assertThat(metadataLog.getField("timestamp")) + .isEqualTo(Instant.ofEpochMilli(currentSnapshot.timestampMillis())); + assertThat(metadataLog.getField("file")).isEqualTo(tableMetadata.metadataFileLocation()); + assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(currentSnapshot.snapshotId()); + assertThat(metadataLog.getField("latest_schema_id")).isEqualTo(currentSnapshot.schemaId()); + assertThat(metadataLog.getField("latest_sequence_number")) + .isEqualTo(currentSnapshot.sequenceNumber()); // test filtering List metadataLogWithFilters = sql( "SELECT * FROM %s$metadata_log_entries WHERE latest_snapshot_id = %s", TABLE_NAME, currentSnapshotId); - Assert.assertEquals( - "metadataLogEntries table should return 1 row", 1, metadataLogWithFilters.size()); - + assertThat(metadataLogWithFilters).hasSize(1); metadataLog = metadataLogWithFilters.get(0); - Assert.assertEquals( - Instant.ofEpochMilli(tableMetadata.currentSnapshot().timestampMillis()), - metadataLog.getField("timestamp")); - Assert.assertEquals(tableMetadata.metadataFileLocation(), metadataLog.getField("file")); - Assert.assertEquals( - tableMetadata.currentSnapshot().snapshotId(), metadataLog.getField("latest_snapshot_id")); - Assert.assertEquals( - tableMetadata.currentSnapshot().schemaId(), metadataLog.getField("latest_schema_id")); - Assert.assertEquals( - tableMetadata.currentSnapshot().sequenceNumber(), - metadataLog.getField("latest_sequence_number")); + assertThat(Instant.ofEpochMilli(tableMetadata.currentSnapshot().timestampMillis())) + .isEqualTo(metadataLog.getField("timestamp")); + + assertThat(metadataLog.getField("file")).isEqualTo(tableMetadata.metadataFileLocation()); + assertThat(metadataLog.getField("latest_snapshot_id")) + .isEqualTo(tableMetadata.currentSnapshot().snapshotId()); + assertThat(metadataLog.getField("latest_schema_id")) + .isEqualTo(tableMetadata.currentSnapshot().schemaId()); + assertThat(metadataLog.getField("latest_sequence_number")) + .isEqualTo(tableMetadata.currentSnapshot().sequenceNumber()); // test projection List metadataFiles = @@ -675,14 +659,13 @@ public void testMetadataLogEntries() { metadataFiles.add(tableMetadata.metadataFileLocation()); List metadataLogWithProjection = sql("SELECT file FROM %s$metadata_log_entries", TABLE_NAME); - Assert.assertEquals( - "metadataLogEntries table should return 3 rows", 3, metadataLogWithProjection.size()); + assertThat(metadataLogWithProjection).hasSize(3); for (int i = 0; i < metadataFiles.size(); i++) { - Assert.assertEquals(metadataFiles.get(i), metadataLogWithProjection.get(i).getField("file")); + assertThat(metadataLogWithProjection.get(i).getField("file")).isEqualTo(metadataFiles.get(i)); } } - @Test + @TestTemplate public void testSnapshotReferencesMetatable() { Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); @@ -704,62 +687,63 @@ public void testSnapshotReferencesMetatable() { .commit(); // Check refs table List references = sql("SELECT * FROM %s$refs", TABLE_NAME); - Assert.assertEquals("Refs table should return 3 rows", 3, references.size()); List branches = sql("SELECT * FROM %s$refs WHERE type='BRANCH'", TABLE_NAME); - Assert.assertEquals("Refs table should return 2 branches", 2, branches.size()); + assertThat(references).hasSize(3); + assertThat(branches).hasSize(2); List tags = sql("SELECT * FROM %s$refs WHERE type='TAG'", TABLE_NAME); - Assert.assertEquals("Refs table should return 1 tag", 1, tags.size()); - + assertThat(tags).hasSize(1); // Check branch entries in refs table List mainBranch = sql("SELECT * FROM %s$refs WHERE name='main' AND type='BRANCH'", TABLE_NAME); - Assert.assertEquals("main", mainBranch.get(0).getFieldAs("name")); - Assert.assertEquals("BRANCH", mainBranch.get(0).getFieldAs("type")); - Assert.assertEquals(currentSnapshotId, mainBranch.get(0).getFieldAs("snapshot_id")); - + assertThat((String) mainBranch.get(0).getFieldAs("name")).isEqualTo("main"); + assertThat((String) mainBranch.get(0).getFieldAs("type")).isEqualTo("BRANCH"); + assertThat((Long) mainBranch.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId); List testBranch = sql("SELECT * FROM %s$refs WHERE name='testBranch' AND type='BRANCH'", TABLE_NAME); - Assert.assertEquals("testBranch", testBranch.get(0).getFieldAs("name")); - Assert.assertEquals("BRANCH", testBranch.get(0).getFieldAs("type")); - Assert.assertEquals(currentSnapshotId, testBranch.get(0).getFieldAs("snapshot_id")); - Assert.assertEquals(Long.valueOf(10), testBranch.get(0).getFieldAs("max_reference_age_in_ms")); - Assert.assertEquals(Integer.valueOf(20), testBranch.get(0).getFieldAs("min_snapshots_to_keep")); - Assert.assertEquals(Long.valueOf(30), testBranch.get(0).getFieldAs("max_snapshot_age_in_ms")); + assertThat((String) testBranch.get(0).getFieldAs("name")).isEqualTo("testBranch"); + assertThat((String) testBranch.get(0).getFieldAs("type")).isEqualTo("BRANCH"); + assertThat((Long) testBranch.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId); + assertThat((Long) testBranch.get(0).getFieldAs("max_reference_age_in_ms")) + .isEqualTo(Long.valueOf(10)); + assertThat((Integer) testBranch.get(0).getFieldAs("min_snapshots_to_keep")) + .isEqualTo(Integer.valueOf(20)); + assertThat((Long) testBranch.get(0).getFieldAs("max_snapshot_age_in_ms")) + .isEqualTo(Long.valueOf(30)); // Check tag entries in refs table List testTag = sql("SELECT * FROM %s$refs WHERE name='testTag' AND type='TAG'", TABLE_NAME); - Assert.assertEquals("testTag", testTag.get(0).getFieldAs("name")); - Assert.assertEquals("TAG", testTag.get(0).getFieldAs("type")); - Assert.assertEquals(currentSnapshotId, testTag.get(0).getFieldAs("snapshot_id")); - Assert.assertEquals(Long.valueOf(50), testTag.get(0).getFieldAs("max_reference_age_in_ms")); - + assertThat((String) testTag.get(0).getFieldAs("name")).isEqualTo("testTag"); + assertThat((String) testTag.get(0).getFieldAs("type")).isEqualTo("TAG"); + assertThat((Long) testTag.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId); + assertThat((Long) testTag.get(0).getFieldAs("max_reference_age_in_ms")) + .isEqualTo(Long.valueOf(50)); // Check projection in refs table List testTagProjection = sql( "SELECT name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep FROM %s$refs where type='TAG'", TABLE_NAME); - Assert.assertEquals("testTag", testTagProjection.get(0).getFieldAs("name")); - Assert.assertEquals("TAG", testTagProjection.get(0).getFieldAs("type")); - Assert.assertEquals(currentSnapshotId, testTagProjection.get(0).getFieldAs("snapshot_id")); - Assert.assertEquals( - Long.valueOf(50), testTagProjection.get(0).getFieldAs("max_reference_age_in_ms")); - Assert.assertNull(testTagProjection.get(0).getFieldAs("min_snapshots_to_keep")); - + assertThat((String) testTagProjection.get(0).getFieldAs("name")).isEqualTo("testTag"); + assertThat((String) testTagProjection.get(0).getFieldAs("type")).isEqualTo("TAG"); + assertThat((Long) testTagProjection.get(0).getFieldAs("snapshot_id")) + .isEqualTo(currentSnapshotId); + assertThat((Long) testTagProjection.get(0).getFieldAs("max_reference_age_in_ms")) + .isEqualTo(Long.valueOf(50)); + assertThat((String) testTagProjection.get(0).getFieldAs("min_snapshots_to_keep")).isNull(); List mainBranchProjection = sql("SELECT name, type FROM %s$refs WHERE name='main' AND type = 'BRANCH'", TABLE_NAME); - Assert.assertEquals("main", mainBranchProjection.get(0).getFieldAs("name")); - Assert.assertEquals("BRANCH", mainBranchProjection.get(0).getFieldAs("type")); - + assertThat((String) mainBranchProjection.get(0).getFieldAs("name")).isEqualTo("main"); + assertThat((String) mainBranchProjection.get(0).getFieldAs("type")).isEqualTo("BRANCH"); List testBranchProjection = sql( "SELECT type, name, max_reference_age_in_ms, snapshot_id FROM %s$refs WHERE name='testBranch' AND type = 'BRANCH'", TABLE_NAME); - Assert.assertEquals("testBranch", testBranchProjection.get(0).getFieldAs("name")); - Assert.assertEquals("BRANCH", testBranchProjection.get(0).getFieldAs("type")); - Assert.assertEquals(currentSnapshotId, testBranchProjection.get(0).getFieldAs("snapshot_id")); - Assert.assertEquals( - Long.valueOf(10), testBranchProjection.get(0).getFieldAs("max_reference_age_in_ms")); + assertThat((String) testBranchProjection.get(0).getFieldAs("name")).isEqualTo("testBranch"); + assertThat((String) testBranchProjection.get(0).getFieldAs("type")).isEqualTo("BRANCH"); + assertThat((Long) testBranchProjection.get(0).getFieldAs("snapshot_id")) + .isEqualTo(currentSnapshotId); + assertThat((Long) testBranchProjection.get(0).getFieldAs("max_reference_age_in_ms")) + .isEqualTo(Long.valueOf(10)); } /** diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 633e11718b9b..09d5a5481aee 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -33,31 +35,25 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.MiniClusterResource; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; -public class TestStreamScanSql extends FlinkCatalogTestBase { +public class TestStreamScanSql extends CatalogTestBase { private static final String TABLE = "test_table"; private static final FileFormat FORMAT = FileFormat.PARQUET; private TableEnvironment tEnv; - public TestStreamScanSql(String catalogName, Namespace baseNamespace) { - super(catalogName, baseNamespace); - } - @Override protected TableEnvironment getTableEnv() { if (tEnv == null) { @@ -85,7 +81,7 @@ protected TableEnvironment getTableEnv() { } @Override - @Before + @BeforeEach public void before() { super.before(); sql("CREATE DATABASE %s", flinkDatabase); @@ -94,7 +90,7 @@ public void before() { } @Override - @After + @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); @@ -102,7 +98,7 @@ public void clean() { } private void insertRows(String partition, Table table, Row... rows) throws IOException { - GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, TEMPORARY_FOLDER); + GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory); GenericRecord gRecord = GenericRecord.create(table.schema()); List records = Lists.newArrayList(); @@ -127,20 +123,16 @@ private void insertRows(Table table, Row... rows) throws IOException { private void assertRows(List expectedRows, Iterator iterator) { for (Row expectedRow : expectedRows) { - Assert.assertTrue("Should have more records", iterator.hasNext()); - + assertThat(iterator).hasNext(); Row actualRow = iterator.next(); - Assert.assertEquals("Should have expected fields", 3, actualRow.getArity()); - Assert.assertEquals( - "Should have expected id", expectedRow.getField(0), actualRow.getField(0)); - Assert.assertEquals( - "Should have expected data", expectedRow.getField(1), actualRow.getField(1)); - Assert.assertEquals( - "Should have expected dt", expectedRow.getField(2), actualRow.getField(2)); + assertThat(actualRow.getArity()).isEqualTo(3); + assertThat(actualRow.getField(0)).isEqualTo(expectedRow.getField(0)); + assertThat(actualRow.getField(1)).isEqualTo(expectedRow.getField(1)); + assertThat(actualRow.getField(2)).isEqualTo(expectedRow.getField(2)); } } - @Test + @TestTemplate public void testUnPartitionedTable() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); @@ -160,7 +152,7 @@ public void testUnPartitionedTable() throws Exception { result.getJobClient().ifPresent(JobClient::cancel); } - @Test + @TestTemplate public void testPartitionedTable() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY (dt)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); @@ -187,7 +179,7 @@ public void testPartitionedTable() throws Exception { result.getJobClient().ifPresent(JobClient::cancel); } - @Test + @TestTemplate public void testConsumeFromBeginning() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); @@ -212,7 +204,7 @@ public void testConsumeFromBeginning() throws Exception { result.getJobClient().ifPresent(JobClient::cancel); } - @Test + @TestTemplate public void testConsumeFilesWithBranch() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); @@ -229,7 +221,7 @@ public void testConsumeFilesWithBranch() throws Exception { .hasMessage("Cannot scan table using ref b1 configured for streaming reader yet"); } - @Test + @TestTemplate public void testConsumeFromStartSnapshotId() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); @@ -267,7 +259,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { result.getJobClient().ifPresent(JobClient::cancel); } - @Test + @TestTemplate public void testConsumeFromStartTag() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));