diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java index 5a5fd6f751ed..2df88c0ed104 100644 --- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java @@ -52,4 +52,13 @@ public static long propertyAsLong(Map properties, } return defaultValue; } + + public static String propertyAsString(Map properties, + String property, String defaultValue) { + String value = properties.get(property); + if (value != null) { + return properties.get(property); + } + return defaultValue; + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java new file mode 100644 index 000000000000..3fc2bb7ba8f8 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.Table; + +public class Actions { + + private StreamExecutionEnvironment env; + private Table table; + + private Actions(StreamExecutionEnvironment env, Table table) { + this.env = env; + this.table = table; + } + + public static Actions forTable(StreamExecutionEnvironment env, Table table) { + return new Actions(env, table); + } + + public static Actions forTable(Table table) { + return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table); + } + + public RewriteDataFilesAction rewriteDataFiles() { + return new RewriteDataFilesAction(env, table); + } + +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java new file mode 100644 index 000000000000..12a80009e01e --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction { + + private StreamExecutionEnvironment env; + private int maxParallelism; + + public RewriteDataFilesAction(StreamExecutionEnvironment env, Table table) { + super(table); + this.env = env; + this.maxParallelism = env.getParallelism(); + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List rewriteDataForTasks(List combinedScanTasks) { + int size = combinedScanTasks.size(); + int parallelism = Math.min(size, maxParallelism); + DataStream dataStream = env.fromCollection(combinedScanTasks); + RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager()); + List addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); + return addedDataFiles; + } + + @Override + protected RewriteDataFilesAction self() { + return this; + } + + public void maxParallelism(int parallelism) { + Preconditions.checkArgument(parallelism > 0, "Invalid max parallelism %d", parallelism); + this.maxParallelism = parallelism; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java index 6ed769638109..9d56ec6a812a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java @@ -27,7 +27,7 @@ * * @param data type of record. */ -interface TaskWriterFactory extends Serializable { +public interface TaskWriterFactory extends Serializable { /** * Initialize the factory with a given taskId and attemptId. diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java new file mode 100644 index 000000000000..65a3ca92c4d0 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +public class RowDataRewriter { + + private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); + + private final Schema schema; + private final FileFormat format; + private final String nameMapping; + private final FileIO io; + private final boolean caseSensitive; + private final EncryptionManager encryptionManager; + private final TaskWriterFactory taskWriterFactory; + + public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, EncryptionManager encryptionManager) { + this.schema = table.schema(); + this.caseSensitive = caseSensitive; + this.io = io; + this.encryptionManager = encryptionManager; + this.nameMapping = PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null); + + String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); + this.taskWriterFactory = new RowDataTaskWriterFactory( + table.schema(), + flinkSchema, + table.spec(), + table.locationProvider(), + io, + encryptionManager, + Long.MAX_VALUE, + format, + table.properties()); + } + + public List rewriteDataForTasks(DataStream dataStream, int parallelism) { + RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory); + DataStream> ds = dataStream.map(map).setParallelism(parallelism); + return Lists.newArrayList(DataStreamUtils.collect(ds)).stream().flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + public static class RewriteMap extends RichMapFunction> { + + private TaskWriter writer; + private int subTaskId; + private int attemptId; + + private final Schema schema; + private final String nameMapping; + private final FileIO io; + private final boolean caseSensitive; + private final EncryptionManager encryptionManager; + private final TaskWriterFactory taskWriterFactory; + + public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean caseSensitive, + EncryptionManager encryptionManager, TaskWriterFactory taskWriterFactory) { + this.schema = schema; + this.nameMapping = nameMapping; + this.io = io; + this.caseSensitive = caseSensitive; + this.encryptionManager = encryptionManager; + this.taskWriterFactory = taskWriterFactory; + } + + @Override + public void open(Configuration parameters) { + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + this.attemptId = getRuntimeContext().getAttemptNumber(); + // Initialize the task writer factory. + this.taskWriterFactory.initialize(subTaskId, attemptId); + } + + @Override + public List map(CombinedScanTask task) throws Exception { + // Initialize the task writer. + this.writer = taskWriterFactory.create(); + try (RowDataIterator iterator = + new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) { + while (iterator.hasNext()) { + RowData rowData = iterator.next(); + writer.write(rowData); + } + return Lists.newArrayList(writer.complete()); + } catch (Throwable originalThrowable) { + try { + LOG.error("Aborting commit for (subTaskId {}, attemptId {})", subTaskId, attemptId); + writer.abort(); + LOG.error("Aborted commit for (subTaskId {}, attemptId {})", subTaskId, attemptId); + } catch (Throwable inner) { + if (originalThrowable != inner) { + originalThrowable.addSuppressed(inner); + LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner); + } + } + + if (originalThrowable instanceof Exception) { + throw originalThrowable; + } else { + throw new RuntimeException(originalThrowable); + } + } + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java index b680af792039..6782267fdd86 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java @@ -72,7 +72,7 @@ protected TableEnvironment getTableEnv() { return tEnv; } - List sql(String query, Object... args) { + protected List sql(String query, Object... args) { TableResult tableResult = getTableEnv().executeSql(String.format(query, args)); tableResult.getJobClient().ifPresent(c -> { try { diff --git a/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java new file mode 100644 index 000000000000..748a4ce3584b --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.flink.actions; + +import java.io.IOException; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.flink.SimpleDataUtil.RECORD; + +@RunWith(Parameterized.class) +public class TestRewriteDataFilesAction extends FlinkCatalogTestBase { + + private static final String TABLE_NAME_UNPARTITIONED = "test_table_unpartitioned"; + private static final String TABLE_NAME_PARTITIONED = "test_table_partitioned"; + private final FileFormat format; + private Table icebergTableUnPartitioned; + private Table icebergTablePartitioned; + + public TestRewriteDataFilesAction(String catalogName, String[] baseNamespace, FileFormat format) { + super(catalogName, baseNamespace); + this.format = format; + } + + @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}") + public static Iterable parameters() { + List parameters = Lists.newArrayList(); + for (FileFormat format : new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) { + for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { + String catalogName = (String) catalogParams[0]; + String[] baseNamespace = (String[]) catalogParams[1]; + parameters.add(new Object[] {catalogName, baseNamespace, format}); + } + } + return parameters; + } + + @Before + public void before() { + super.before(); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME_UNPARTITIONED, + format.name()); + icebergTableUnPartitioned = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, + TABLE_NAME_UNPARTITIONED)); + + sql("CREATE TABLE %s (id int, data varchar,spec varchar) " + + " PARTITIONED BY (data,spec) with ('write.format.default'='%s')", + TABLE_NAME_PARTITIONED, format.name()); + icebergTablePartitioned = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, + TABLE_NAME_PARTITIONED)); + } + + @After + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED); + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED); + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testRewriteDataFilesEmptyTable() throws Exception { + Assert.assertNull("Table must be empty", icebergTableUnPartitioned.currentSnapshot()); + Actions.forTable(icebergTableUnPartitioned) + .rewriteDataFiles() + .execute(); + Assert.assertNull("Table must stay empty", icebergTableUnPartitioned.currentSnapshot()); + } + + + @Test + public void testRewriteDataFilesUnpartitionedTable() throws Exception { + sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED); + sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED); + + icebergTableUnPartitioned.refresh(); + + CloseableIterable tasks = icebergTableUnPartitioned.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size()); + + RewriteDataFilesActionResult result = + Actions.forTable(icebergTableUnPartitioned) + .rewriteDataFiles() + .execute(); + + Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); + Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); + + icebergTableUnPartitioned.refresh(); + + CloseableIterable tasks1 = icebergTableUnPartitioned.newScan().planFiles(); + List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); + Assert.assertEquals("Should have 1 data files after rewrite", 1, dataFiles1.size()); + + // Assert the table records as expected. + SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "hello"), + SimpleDataUtil.createRecord(2, "world") + )); + } + + @Test + public void testRewriteDataFilesPartitionedTable() throws Exception { + sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 3, 'world' ,'b'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED); + + icebergTablePartitioned.refresh(); + + CloseableIterable tasks = icebergTablePartitioned.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 4 data files before rewrite", 4, dataFiles.size()); + + RewriteDataFilesActionResult result = + Actions.forTable(icebergTablePartitioned) + .rewriteDataFiles() + .execute(); + + Assert.assertEquals("Action should rewrite 4 data files", 4, result.deletedDataFiles().size()); + Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size()); + + icebergTablePartitioned.refresh(); + + CloseableIterable tasks1 = icebergTablePartitioned.newScan().planFiles(); + List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); + Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles1.size()); + + // Assert the table records as expected. + Schema schema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "spec", Types.StringType.get()) + ); + + Record record = GenericRecord.create(schema); + SimpleDataUtil.assertTableRecords(icebergTablePartitioned, Lists.newArrayList( + record.copy("id", 1, "data", "hello", "spec", "a"), + record.copy("id", 2, "data", "hello", "spec", "a"), + record.copy("id", 3, "data", "world", "spec", "b"), + record.copy("id", 4, "data", "world", "spec", "b") + )); + } + + + @Test + public void testRewriteDataFilesWithFilter() throws Exception { + sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 3, 'world' ,'a'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s SELECT 5, 'world' ,'b'", TABLE_NAME_PARTITIONED); + + icebergTablePartitioned.refresh(); + + CloseableIterable tasks = icebergTablePartitioned.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 5 data files before rewrite", 5, dataFiles.size()); + + RewriteDataFilesActionResult result = + Actions.forTable(icebergTablePartitioned) + .rewriteDataFiles() + .filter(Expressions.equal("spec", "a")) + .filter(Expressions.startsWith("data", "he")) + .execute(); + + Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); + Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); + + icebergTablePartitioned.refresh(); + + CloseableIterable tasks1 = icebergTablePartitioned.newScan().planFiles(); + List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); + Assert.assertEquals("Should have 4 data files after rewrite", 4, dataFiles1.size()); + + // Assert the table records as expected. + Schema schema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "spec", Types.StringType.get()) + ); + + Record record = GenericRecord.create(schema); + SimpleDataUtil.assertTableRecords(icebergTablePartitioned, Lists.newArrayList( + record.copy("id", 1, "data", "hello", "spec", "a"), + record.copy("id", 2, "data", "hello", "spec", "a"), + record.copy("id", 3, "data", "world", "spec", "a"), + record.copy("id", 4, "data", "world", "spec", "b"), + record.copy("id", 5, "data", "world", "spec", "b") + )); + } + + @Test + public void testRewriteLargeTableHasResiduals() throws IOException { + // all records belong to the same partition + List records1 = Lists.newArrayList(); + List records2 = Lists.newArrayList(); + List expected = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + int id = i; + String data = String.valueOf(i % 3); + if (i % 2 == 0) { + records1.add("(" + id + ",'" + data + "')"); + } else { + records2.add("(" + id + ",'" + data + "')"); + } + Record record = RECORD.copy(); + record.setField("id", id); + record.setField("data", data); + expected.add(record); + } + + sql("INSERT INTO %s values " + StringUtils.join(records1, ","), TABLE_NAME_UNPARTITIONED); + sql("INSERT INTO %s values " + StringUtils.join(records2, ","), TABLE_NAME_UNPARTITIONED); + + icebergTableUnPartitioned.refresh(); + + CloseableIterable tasks = icebergTableUnPartitioned.newScan() + .ignoreResiduals() + .filter(Expressions.equal("data", "0")) + .planFiles(); + for (FileScanTask task : tasks) { + Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual()); + } + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size()); + + Actions actions = Actions.forTable(icebergTableUnPartitioned); + + RewriteDataFilesActionResult result = actions + .rewriteDataFiles() + .filter(Expressions.equal("data", "0")) + .execute(); + Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size()); + Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size()); + + // Assert the table records as expected. + SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java index b22da5277074..1f40738481e7 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java @@ -36,6 +36,7 @@ public class RewriteDataFilesAction extends BaseRewriteDataFilesAction { private final JavaSparkContext sparkContext; + private FileIO fileIO; RewriteDataFilesAction(SparkSession spark, Table table) { super(table); @@ -49,7 +50,10 @@ protected RewriteDataFilesAction self() { @Override protected FileIO fileIO() { - return SparkUtil.serializableFileIO(table()); + if (this.fileIO == null) { + this.fileIO = SparkUtil.serializableFileIO(table()); + } + return this.fileIO; } @Override