Skip to content
Permalink
Browse files
Add project method to IcebergGenerics to enable projections by Schema (
  • Loading branch information
szlta committed May 23, 2022
1 parent 566b2fe commit 4b68728a907045c9366ca551c95bdfba5297f98b
Showing 3 changed files with 58 additions and 3 deletions.
@@ -19,6 +19,7 @@

package org.apache.iceberg.data;

import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
@@ -67,6 +68,11 @@ public ScanBuilder select(String... selectedColumns) {
return this;
}

public ScanBuilder project(Schema schema) {
this.tableScan = tableScan.project(schema);
return this;
}

public ScanBuilder useSnapshot(long scanSnapshotId) {
this.tableScan = tableScan.useSnapshot(scanSnapshotId);
return this;
@@ -34,6 +34,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -294,8 +295,10 @@ public void testFilter() {

@Test
public void testProject() {
Iterable<Record> results = IcebergGenerics.read(sharedTable).select("id").build();
verifyProjectIdColumn(IcebergGenerics.read(sharedTable).select("id").build());
}

private void verifyProjectIdColumn(Iterable<Record> results) {
Set<Long> expected = Sets.newHashSet();
expected.addAll(Lists.transform(file1FirstSnapshotRecords, record -> (Long) record.getField("id")));
expected.addAll(Lists.transform(file2FirstSnapshotRecords, record -> (Long) record.getField("id")));
@@ -308,6 +311,49 @@ public void testProject() {
expected, Sets.newHashSet(transform(results, record -> (Long) record.getField("id"))));
}

@Test
public void testProjectWithSchema() {
// Test with table schema
Iterable<Record> results = IcebergGenerics.read(sharedTable).project(SCHEMA).build();

Set<Record> expected = Sets.newHashSet();
expected.addAll(file1FirstSnapshotRecords);
expected.addAll(file2FirstSnapshotRecords);
expected.addAll(file3FirstSnapshotRecords);

results.forEach(record -> expected.remove(record));
Assert.assertTrue(expected.isEmpty());

// Test with projected schema
Schema schema = new Schema(required(1, "id", Types.LongType.get()));
verifyProjectIdColumn(IcebergGenerics.read(sharedTable).project(schema).build());

// Test with unknown field
schema = new Schema(optional(999, "unknown", Types.LongType.get()));
IcebergGenerics.read(sharedTable).project(schema).build().forEach(r -> Assert.assertNull(r.get(0)));

// Test with reading some metadata columns
schema = new Schema(
required(1, "id", Types.LongType.get()),
MetadataColumns.metadataColumn(sharedTable, MetadataColumns.PARTITION_COLUMN_NAME),
optional(2, "data", Types.StringType.get()),
MetadataColumns.SPEC_ID,
MetadataColumns.ROW_POSITION
);

Iterator<Record> iterator =
IcebergGenerics.read(sharedTable).project(schema).where(equal("data", "falafel")).build().iterator();

GenericRecord expectedRecord = GenericRecord.create(schema).copy(
ImmutableMap.of("id", 2L,
"data", "falafel",
"_spec_id", 0,
"_pos", 2L));
expectedRecord.setField("_partition", null);
Assert.assertEquals(expectedRecord, iterator.next());
Assert.assertFalse(iterator.hasNext());
}

@Test
public void testProjectWithMissingFilterColumn() {
Iterable<Record> results = IcebergGenerics.read(sharedTable)
@@ -157,9 +157,12 @@ protected StructReader(List<OrcValueReader<?>> readers, Types.StructType struct,
} else if (field.equals(MetadataColumns.IS_DELETED)) {
this.isConstantOrMetadataField[pos] = true;
this.readers[pos] = constants(false);
} else if (MetadataColumns.isMetadataColumn(field.name())) {
// in case of any other metadata field, fill with nulls
this.isConstantOrMetadataField[pos] = true;
this.readers[pos] = constants(null);
} else {
this.readers[pos] = readers.get(readerIndex);
readerIndex++;
this.readers[pos] = readers.get(readerIndex++);
}
}
}

0 comments on commit 4b68728

Please sign in to comment.