Skip to content

Commit

Permalink
Re-add Iceberg bounded source; test splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Apr 12, 2024
1 parent 36f3228 commit a169e6a
Show file tree
Hide file tree
Showing 9 changed files with 1,176 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergIO {

Expand All @@ -28,6 +35,10 @@ public static WriteRows writeToDynamicDestinations(
return new WriteRows(catalog, dynamicDestinations);
}

public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
return new ReadTable(catalogConfig, tableId);
}

static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {

private final IcebergCatalogConfig catalog;
Expand All @@ -47,4 +58,36 @@ public IcebergWriteResult expand(PCollection<Row> input) {
"Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations));
}
}

public static class ReadTable extends PTransform<PBegin, PCollection<Row>> {

private final IcebergCatalogConfig catalogConfig;
private final transient @Nullable TableIdentifier tableId;

private TableIdentifier getTableId() {
return checkStateNotNull(
tableId, "Transient field tableId null; it should not be accessed after serialization");
}

private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
this.catalogConfig = catalogConfig;
this.tableId = tableId;
}

@Override
public PCollection<Row> expand(PBegin input) {

Table table = catalogConfig.catalog().loadTable(getTableId());

return input.apply(
Read.from(
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(catalogConfig)
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(getTableId())
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
.build())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergScanConfig implements Serializable {

private transient @MonotonicNonNull Table cachedTable;

public enum ScanType {
TABLE,
BATCH
}

@Pure
public abstract ScanType getScanType();

@Pure
public abstract IcebergCatalogConfig getCatalogConfig();

@Pure
public abstract String getTableIdentifier();

@Pure
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
}
return cachedTable;
}

@Pure
public abstract Schema getSchema();

@Pure
public abstract @Nullable Expression getFilter();

@Pure
public abstract @Nullable Boolean getCaseSensitive();

@Pure
public abstract ImmutableMap<String, String> getOptions();

@Pure
public abstract @Nullable Long getSnapshot();

@Pure
public abstract @Nullable Long getTimestamp();

@Pure
public abstract @Nullable Long getFromSnapshotInclusive();

@Pure
public abstract @Nullable String getFromSnapshotRefInclusive();

@Pure
public abstract @Nullable Long getFromSnapshotExclusive();

@Pure
public abstract @Nullable String getFromSnapshotRefExclusive();

@Pure
public abstract @Nullable Long getToSnapshot();

@Pure
public abstract @Nullable String getToSnapshotRef();

@Pure
public abstract @Nullable String getTag();

@Pure
public abstract @Nullable String getBranch();

@Pure
public static Builder builder() {
return new AutoValue_IcebergScanConfig.Builder()
.setScanType(ScanType.TABLE)
.setFilter(null)
.setCaseSensitive(null)
.setOptions(ImmutableMap.of())
.setSnapshot(null)
.setTimestamp(null)
.setFromSnapshotInclusive(null)
.setFromSnapshotRefInclusive(null)
.setFromSnapshotExclusive(null)
.setFromSnapshotRefExclusive(null)
.setToSnapshot(null)
.setToSnapshotRef(null)
.setTag(null)
.setBranch(null);
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setScanType(ScanType type);

public abstract Builder setCatalogConfig(IcebergCatalogConfig catalog);

public abstract Builder setTableIdentifier(String tableIdentifier);

public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
return this.setTableIdentifier(tableIdentifier.toString());
}

public Builder setTableIdentifier(String... names) {
return setTableIdentifier(TableIdentifier.of(names));
}

public abstract Builder setSchema(Schema schema);

public abstract Builder setFilter(@Nullable Expression filter);

public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive);

public abstract Builder setOptions(ImmutableMap<String, String> options);

public abstract Builder setSnapshot(@Nullable Long snapshot);

public abstract Builder setTimestamp(@Nullable Long timestamp);

public abstract Builder setFromSnapshotInclusive(@Nullable Long fromInclusive);

public abstract Builder setFromSnapshotRefInclusive(@Nullable String ref);

public abstract Builder setFromSnapshotExclusive(@Nullable Long fromExclusive);

public abstract Builder setFromSnapshotRefExclusive(@Nullable String ref);

public abstract Builder setToSnapshot(@Nullable Long snapshot);

public abstract Builder setToSnapshotRef(@Nullable String ref);

public abstract Builder setTag(@Nullable String tag);

public abstract Builder setBranch(@Nullable String branch);

public abstract IcebergScanConfig build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;

/**
* Source that reads all the data in a table described by an IcebergScanConfig. Supports only
* initial spliting.
*/
class ScanSource extends BoundedSource<Row> {

private IcebergScanConfig scanConfig;

public ScanSource(IcebergScanConfig scanConfig) {
this.scanConfig = scanConfig;
}

private TableScan getTableScan() {
TableScan tableScan =
scanConfig
.getTable()
.newScan()
.project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema()));

if (scanConfig.getFilter() != null) {
tableScan = tableScan.filter(scanConfig.getFilter());
}
if (scanConfig.getCaseSensitive() != null) {
tableScan = tableScan.caseSensitive(scanConfig.getCaseSensitive());
}
if (scanConfig.getSnapshot() != null) {
tableScan = tableScan.useSnapshot(scanConfig.getSnapshot());
}
if (scanConfig.getBranch() != null) {
tableScan = tableScan.useRef(scanConfig.getBranch());
} else if (scanConfig.getTag() != null) {
tableScan = tableScan.useRef(scanConfig.getTag());
}

return tableScan;
}

private CombinedScanTask wholeTableReadTask() {
// Always project to our destination schema
return new BaseCombinedScanTask(ImmutableList.copyOf(getTableScan().planFiles()));
}

@Override
public List<? extends BoundedSource<Row>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
ArrayList<ScanTaskSource> splits = new ArrayList<>();

switch (scanConfig.getScanType()) {
case TABLE:
TableScan tableScan = getTableScan();
if (desiredBundleSizeBytes > 0) {
tableScan =
tableScan.option(TableProperties.SPLIT_SIZE, Long.toString(desiredBundleSizeBytes));
}

try (CloseableIterable<CombinedScanTask> tasks = tableScan.planTasks()) {
for (CombinedScanTask combinedScanTask : tasks) {
splits.add(new ScanTaskSource(scanConfig, combinedScanTask));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
break;
case BATCH:
throw new UnsupportedOperationException("BATCH scan not supported");
default:
throw new UnsupportedOperationException("Unknown scan type: " + scanConfig.getScanType());
}

return splits;
}

@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return wholeTableReadTask().sizeBytes();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
}

@Override
public Coder<Row> getOutputCoder() {
return RowCoder.of(scanConfig.getSchema());
}

@Override
public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
return new ScanTaskReader(new ScanTaskSource(scanConfig, wholeTableReadTask()));
}
}

0 comments on commit a169e6a

Please sign in to comment.