Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public OrcValueReader<RowData> record(
TypeDescription record,
List<String> names,
List<OrcValueReader<?>> fields) {
return FlinkOrcReaders.struct(fields, iStruct, idToConstant);
return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
Expand Down Expand Up @@ -91,8 +92,11 @@ public static <K, V> OrcValueReader<MapData> map(
}

public static OrcValueReader<RowData> struct(
List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
return new StructReader(readers, struct, idToConstant);
TypeDescription record,
List<OrcValueReader<?>> readers,
Types.StructType struct,
Map<Integer, ?> idToConstant) {
return new StructReader(record, readers, struct, idToConstant);
}

private static class StringReader implements OrcValueReader<StringData> {
Expand Down Expand Up @@ -265,8 +269,11 @@ private static class StructReader extends OrcValueReaders.StructReader<RowData>
private final int numFields;

StructReader(
List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
TypeDescription record,
List<OrcValueReader<?>> readers,
Types.StructType struct,
Map<Integer, ?> idToConstant) {
super(record, readers, struct, idToConstant);
this.numFields = struct.fields().size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand All @@ -44,8 +45,14 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;

class TestRewriteDataFiles extends MaintenanceTaskTestBase {

private static final FileFormat[] FILE_FORMATS =
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};

@Test
void testRewriteUnpartitioned() throws Exception {
Table table = createTable();
Expand Down Expand Up @@ -83,13 +90,14 @@ void testRewriteUnpartitioned() throws Exception {
createRecord(4, "d")));
}

@Test
void testRewriteUnpartitionedPreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
insert(table, 2, "b");
insert(table, 3, "c");
insert(table, 4, "d");
@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws Exception {
Table table = createTable(3, fileFormat);
insert(table, 1, "a", fileFormat);
insert(table, 2, "b", fileFormat);
insert(table, 3, "c", fileFormat);
insert(table, 4, "d", fileFormat);

assertFileNum(table, 4, 0);

Expand Down Expand Up @@ -123,15 +131,17 @@ void testRewriteUnpartitionedPreserveLineage() throws Exception {
schema);
}

@Test
void testRewriteTheSameFilePreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
insert(table, 2, "b");
@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws Exception {
Table table = createTable(3, fileFormat);
insert(table, 1, "a", fileFormat);
insert(table, 2, "b", fileFormat);
// Create a file with two lines of data to verify that the rowid is read correctly.
insert(
table,
ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")));
ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")),
fileFormat);

assertFileNum(table, 3, 0);

Expand Down Expand Up @@ -167,13 +177,14 @@ void testRewriteTheSameFilePreserveLineage() throws Exception {
schema);
}

@Test
void testRewritePartitionedPreserveLineage() throws Exception {
Table table = createPartitionedTable(3);
insertPartitioned(table, 1, "p1");
insertPartitioned(table, 2, "p1");
insertPartitioned(table, 3, "p2");
insertPartitioned(table, 4, "p2");
@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws Exception {
Table table = createPartitionedTable(3, fileFormat);
insertPartitioned(table, 1, "p1", fileFormat);
insertPartitioned(table, 2, "p1", fileFormat);
insertPartitioned(table, 3, "p2", fileFormat);
insertPartitioned(table, 4, "p2", fileFormat);

assertFileNum(table, 4, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,14 @@ void after() throws IOException {
}

protected static Table createTable() {
return createTable(2);
return createTable(2, FileFormat.PARQUET);
}

protected static Table createTable(int formatVersion) {
return createPartitionedTable(formatVersion, FileFormat.PARQUET);
}

protected static Table createTable(int formatVersion, FileFormat fileFormat) {
return CATALOG_EXTENSION
.catalog()
.createTable(
Expand All @@ -145,6 +149,8 @@ protected static Table createTable(int formatVersion) {
PartitionSpec.unpartitioned(),
null,
ImmutableMap.of(
"write.format.default",
fileFormat.name(),
TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
Expand Down Expand Up @@ -182,7 +188,7 @@ protected static Table createTableWithDelete(int formatVersion) {
"format-version", String.valueOf(formatVersion), "write.upsert.enabled", "true"));
}

protected static Table createPartitionedTable(int formatVersion) {
protected static Table createPartitionedTable(int formatVersion, FileFormat fileFormat) {
return CATALOG_EXTENSION
.catalog()
.createTable(
Expand All @@ -191,24 +197,36 @@ protected static Table createPartitionedTable(int formatVersion) {
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
null,
ImmutableMap.of(
"write.format.default",
fileFormat.name(),
"format-version",
String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
"100000"));
}

protected static Table createPartitionedTable() {
return createPartitionedTable(2);
return createPartitionedTable(2, FileFormat.PARQUET);
}

protected void insert(Table table, Integer id, String data) throws IOException {
new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
insert(table, id, data, FileFormat.PARQUET);
}

protected void insert(Table table, Integer id, String data, FileFormat fileFormat)
throws IOException {
new GenericAppenderHelper(table, fileFormat, warehouseDir)
.appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
table.refresh();
}

protected void insert(Table table, List<Record> records) throws IOException {
new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir).appendToTable(records);
insert(table, records, FileFormat.PARQUET);
}

protected void insert(Table table, List<Record> records, FileFormat fileFormat)
throws IOException {
new GenericAppenderHelper(table, fileFormat, warehouseDir).appendToTable(records);
table.refresh();
}

Expand Down Expand Up @@ -309,7 +327,12 @@ protected void update(Table table, Integer id, String oldData, String tempData,
}

protected void insertPartitioned(Table table, Integer id, String data) throws IOException {
new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
insertPartitioned(table, id, data, FileFormat.PARQUET);
}

protected void insertPartitioned(Table table, Integer id, String data, FileFormat fileFormat)
throws IOException {
new GenericAppenderHelper(table, fileFormat, warehouseDir)
.appendToTable(
TestHelpers.Row.of(data), Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
table.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public OrcValueReader<RowData> record(
TypeDescription record,
List<String> names,
List<OrcValueReader<?>> fields) {
return FlinkOrcReaders.struct(fields, iStruct, idToConstant);
return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
Expand Down Expand Up @@ -91,8 +92,11 @@ public static <K, V> OrcValueReader<MapData> map(
}

public static OrcValueReader<RowData> struct(
List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
return new StructReader(readers, struct, idToConstant);
TypeDescription record,
List<OrcValueReader<?>> readers,
Types.StructType struct,
Map<Integer, ?> idToConstant) {
return new StructReader(record, readers, struct, idToConstant);
}

private static class StringReader implements OrcValueReader<StringData> {
Expand Down Expand Up @@ -265,8 +269,11 @@ private static class StructReader extends OrcValueReaders.StructReader<RowData>
private final int numFields;

StructReader(
List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
TypeDescription record,
List<OrcValueReader<?>> readers,
Types.StructType struct,
Map<Integer, ?> idToConstant) {
super(record, readers, struct, idToConstant);
this.numFields = struct.fields().size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand All @@ -44,8 +45,14 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;

class TestRewriteDataFiles extends MaintenanceTaskTestBase {

private static final FileFormat[] FILE_FORMATS =
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};

@Test
void testRewriteUnpartitioned() throws Exception {
Table table = createTable();
Expand Down Expand Up @@ -83,13 +90,14 @@ void testRewriteUnpartitioned() throws Exception {
createRecord(4, "d")));
}

@Test
void testRewriteUnpartitionedPreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
insert(table, 2, "b");
insert(table, 3, "c");
insert(table, 4, "d");
@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws Exception {
Table table = createTable(3, fileFormat);
insert(table, 1, "a", fileFormat);
insert(table, 2, "b", fileFormat);
insert(table, 3, "c", fileFormat);
insert(table, 4, "d", fileFormat);

assertFileNum(table, 4, 0);

Expand Down Expand Up @@ -123,15 +131,17 @@ void testRewriteUnpartitionedPreserveLineage() throws Exception {
schema);
}

@Test
void testRewriteTheSameFilePreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
insert(table, 2, "b");
@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws Exception {
Table table = createTable(3, fileFormat);
insert(table, 1, "a", fileFormat);
insert(table, 2, "b", fileFormat);
// Create a file with two lines of data to verify that the rowid is read correctly.
insert(
table,
ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")));
ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")),
fileFormat);

assertFileNum(table, 3, 0);

Expand Down Expand Up @@ -167,13 +177,14 @@ void testRewriteTheSameFilePreserveLineage() throws Exception {
schema);
}

@Test
void testRewritePartitionedPreserveLineage() throws Exception {
Table table = createPartitionedTable(3);
insertPartitioned(table, 1, "p1");
insertPartitioned(table, 2, "p1");
insertPartitioned(table, 3, "p2");
insertPartitioned(table, 4, "p2");
@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws Exception {
Table table = createPartitionedTable(3, fileFormat);
insertPartitioned(table, 1, "p1", fileFormat);
insertPartitioned(table, 2, "p1", fileFormat);
insertPartitioned(table, 3, "p2", fileFormat);
insertPartitioned(table, 4, "p2", fileFormat);

assertFileNum(table, 4, 0);

Expand Down
Loading
Loading