Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation project(":sdks:java:io:parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.streaming(configuration.getStreaming())
.keeping(configuration.getKeep())
.dropping(configuration.getDrop())
.withFilter(configuration.getFilter());
.withFilter(configuration.getFilter())
.withWatermarkColumn(configuration.getWatermarkColumn());

@Nullable Integer pollIntervalSeconds = configuration.getPollIntervalSeconds();
if (pollIntervalSeconds != null) {
readRows = readRows.withPollInterval(Duration.standardSeconds(pollIntervalSeconds));
}
@Nullable Long maxDelay = configuration.getMaxSnapshotDiscoveryDelay();
if (maxDelay != null) {
readRows = readRows.withMaxSnapshotDiscoveryDelay(Duration.standardSeconds(maxDelay));
}

PCollection<Row> output = input.getPipeline().apply(readRows);

Expand Down Expand Up @@ -194,6 +199,18 @@ static Builder builder() {
"A subset of column names to exclude from reading. If null or empty, all columns will be read.")
abstract @Nullable List<String> getDrop();

@SchemaFieldDescription(
"Column used to derive the source's output watermark. "
+ "Must be an existing, required, top-level column of type 'long' or 'timestamp'. "
+ "If not set, the watermark advances according to snapshot commit timestamp.")
abstract @Nullable String getWatermarkColumn();

@SchemaFieldDescription(
"Maximum expected snapshot discovery delay in seconds. While idle, the source may advance "
+ "the watermark to now() minus this delay; snapshots discovered later with older commit "
+ "timestamps may be treated as late by downstream windowing. Default: 600 seconds.")
abstract @Nullable Long getMaxSnapshotDiscoveryDelay();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setTable(String table);
Expand Down Expand Up @@ -224,6 +241,10 @@ abstract static class Builder {

abstract Builder setFilter(String filter);

abstract Builder setWatermarkColumn(String watermarkColumn);

abstract Builder setMaxSnapshotDiscoveryDelay(Long seconds);

abstract Configuration build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.iceberg.cdc.IncrementalChangelogSource;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
Expand Down Expand Up @@ -569,6 +570,10 @@ public enum StartingStrategy {

abstract @Nullable String getFilter();

abstract @Nullable String getWatermarkColumn();

abstract @Nullable Duration getMaxSnapshotDiscoveryDelay();

abstract Builder toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -599,6 +604,10 @@ abstract static class Builder {

abstract Builder setFilter(@Nullable String filter);

abstract Builder setWatermarkColumn(@Nullable String watermarkColumn);

abstract Builder setMaxSnapshotDiscoveryDelay(@Nullable Duration delay);

abstract ReadRows build();
}

Expand Down Expand Up @@ -650,12 +659,27 @@ public ReadRows withFilter(@Nullable String filter) {
return toBuilder().setFilter(filter).build();
}

public ReadRows withWatermarkColumn(@Nullable String watermarkColumn) {
return toBuilder().setWatermarkColumn(watermarkColumn).build();
}

public ReadRows withMaxSnapshotDiscoveryDelay(@Nullable Duration delay) {
return toBuilder().setMaxSnapshotDiscoveryDelay(delay).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");

Table table = getCatalogConfig().catalog().loadTable(tableId);
Table table;
try {
table = getCatalogConfig().catalog().loadTable(tableId);
} catch (Exception e) {
throw new RuntimeException(
"Could not fetch table at expansion time. Doing so is needed to "
+ "determine the output Row schema.",
e);
}

IcebergScanConfig scanConfig =
IcebergScanConfig.builder()
Expand All @@ -674,12 +698,15 @@ public PCollection<Row> expand(PBegin input) {
.setKeepFields(getKeep())
.setDropFields(getDrop())
.setFilterString(getFilter())
.setWatermarkColumn(getWatermarkColumn())
.setMaxSnapshotDiscoveryDelay(getMaxSnapshotDiscoveryDelay())
.build();
scanConfig.validate(table);

PTransform<PBegin, PCollection<Row>> source =
getUseCdc()
? new IncrementalScanSource(scanConfig)
? new IncrementalChangelogSource(scanConfig)
// ? new IncrementalScanSource(scanConfig)
: Read.from(new ScanSource(scanConfig));

return input.apply(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,37 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.newHashSet;
import static org.apache.iceberg.types.Type.TypeID.LONG;
import static org.apache.iceberg.types.Type.TypeID.TIMESTAMP;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.NestedField;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
Expand All @@ -50,6 +60,8 @@ public abstract class IcebergScanConfig implements Serializable {
private transient org.apache.iceberg.@MonotonicNonNull Schema cachedRequiredSchema;
private transient @MonotonicNonNull Evaluator cachedEvaluator;
private transient @MonotonicNonNull Expression cachedFilter;
private transient org.apache.iceberg.@MonotonicNonNull Schema cachedRecordIdSchema;
private transient @MonotonicNonNull Schema cachedRowIdBeamSchema;

public enum ScanType {
TABLE,
Expand Down Expand Up @@ -89,9 +101,9 @@ static org.apache.iceberg.Schema resolveSchema(
@Nullable List<String> keep,
@Nullable List<String> drop,
@Nullable Set<String> fieldsInFilter) {
ImmutableList.Builder<String> selectedFieldsBuilder = ImmutableList.builder();
Set<String> selectedFields = new LinkedHashSet<>();
if (keep != null && !keep.isEmpty()) {
selectedFieldsBuilder.addAll(keep);
selectedFields.addAll(keep);
} else if (drop != null && !drop.isEmpty()) {
List<String> paths = new ArrayList<>(TypeUtil.indexNameById(schema.asStruct()).values());
Collections.sort(paths);
Expand All @@ -100,7 +112,7 @@ static org.apache.iceberg.Schema resolveSchema(
boolean isParent = i + 1 < paths.size() && paths.get(i + 1).startsWith(path + ".");
boolean isDrop = drop.stream().anyMatch(d -> path.equals(d) || path.startsWith(d + "."));
if (!isParent && !isDrop) {
selectedFieldsBuilder.add(path);
selectedFields.add(path);
}
}
} else {
Expand All @@ -111,9 +123,8 @@ static org.apache.iceberg.Schema resolveSchema(
if (fieldsInFilter != null && !fieldsInFilter.isEmpty()) {
fieldsInFilter.stream()
.map(f -> schema.caseInsensitiveFindField(f).name())
.forEach(selectedFieldsBuilder::add);
.forEach(selectedFields::add);
}
ImmutableList<String> selectedFields = selectedFieldsBuilder.build();
return selectedFields.isEmpty() ? schema : schema.select(selectedFields);
}

Expand Down Expand Up @@ -141,15 +152,34 @@ public org.apache.iceberg.Schema getRequiredSchema() {
return cachedRequiredSchema;
}

public org.apache.iceberg.Schema recordIdSchema() {
if (cachedRecordIdSchema == null) {
org.apache.iceberg.Schema fullSchema = TableCache.get(getTableIdentifier()).schema();
cachedRecordIdSchema = TypeUtil.select(fullSchema, fullSchema.identifierFieldIds());
}
return cachedRecordIdSchema;
}

public Schema rowIdBeamSchema() {
if (cachedRowIdBeamSchema == null) {
cachedRowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema());
}
return cachedRowIdBeamSchema;
}

public Comparator<StructLike> recordIdComparator() {
return Comparators.forType(recordIdSchema().asStruct());
}

@Pure
@Nullable
public Evaluator getEvaluator() {
public Evaluator getEvaluator(org.apache.iceberg.Schema requiredSchema) {
@Nullable Expression filter = getFilter();
if (filter == null) {
return null;
}
if (cachedEvaluator == null) {
cachedEvaluator = new Evaluator(getRequiredSchema().asStruct(), filter);
cachedEvaluator = new Evaluator(requiredSchema.asStruct(), filter);
}
return cachedEvaluator;
}
Expand Down Expand Up @@ -226,6 +256,12 @@ public Expression getFilter() {
@Pure
public abstract @Nullable List<String> getDropFields();

@Pure
public abstract @Nullable String getWatermarkColumn();

@Pure
public abstract @Nullable Duration getMaxSnapshotDiscoveryDelay();

@Pure
public static Builder builder() {
return new AutoValue_IcebergScanConfig.Builder()
Expand All @@ -248,7 +284,8 @@ public static Builder builder() {
.setPollInterval(null)
.setStartingStrategy(null)
.setTag(null)
.setBranch(null);
.setBranch(null)
.setWatermarkColumn(null);
}

@AutoValue.Builder
Expand Down Expand Up @@ -311,6 +348,10 @@ public Builder setTableIdentifier(String... names) {

public abstract Builder setDropFields(@Nullable List<String> fields);

public abstract Builder setWatermarkColumn(@Nullable String watermarkColumn);

public abstract Builder setMaxSnapshotDiscoveryDelay(@Nullable Duration delay);

public abstract IcebergScanConfig build();
}

Expand All @@ -328,16 +369,19 @@ void validate(Table table) {
String param;
if (keep != null) {
param = "keep";
fieldsSpecified = newHashSet(checkNotNull(keep));
fieldsSpecified = newHashSet(checkArgumentNotNull(keep));
} else { // drop != null
param = "drop";
fieldsSpecified = newHashSet(checkNotNull(drop));
fieldsSpecified = newHashSet(checkArgumentNotNull(drop));
}
fieldsSpecified.removeIf(name -> table.schema().findField(name) != null);

checkArgument(
fieldsSpecified.isEmpty(),
error(String.format("'%s' specifies unknown field(s): %s", param, fieldsSpecified)));
fieldsSpecified.isEmpty()
|| fieldsSpecified.stream().allMatch(MetadataColumns::isMetadataColumn),
error("'%s' specifies unknown field(s): %s"),
param,
fieldsSpecified);
}

// TODO(#34168, ahmedabu98): fill these gaps for the existing batch source
Expand Down Expand Up @@ -371,6 +415,18 @@ void validate(Table table) {
+ "reading with Managed.ICEBERG_CDC: "
+ invalidOptions));
}
} else {
Set<Integer> primaryKeyIds = new HashSet<>(table.schema().identifierFieldIds());
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specified any primary key fields.");
Set<Integer> projectedPrimaryKeyIds = getProjectedSchema().identifierFieldIds();
primaryKeyIds.removeAll(projectedPrimaryKeyIds);
checkArgument(
primaryKeyIds.isEmpty(),
"When reading CDC records, the projected schema must not drop primary key fields. "
+ "The specified configuration drops the following PK fields: %s",
primaryKeyIds);
}

if (getStartingStrategy() != null) {
Expand All @@ -385,12 +441,44 @@ void validate(Table table) {
checkArgument(
getToTimestamp() == null || getToSnapshot() == null,
error("only one of 'to_timestamp' or 'to_snapshot' can be set"));
@Nullable Long fromSnapshotId = ReadUtils.getFromSnapshotInclusive(table, this);
@Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, this);
if (fromSnapshotId != null) {
checkArgumentNotNull(
table.snapshot(fromSnapshotId),
error("configured starting snapshot does not exist: '%s'"),
fromSnapshotId);
}
if (toSnapshotId != null) {
checkArgumentNotNull(
table.snapshot(toSnapshotId),
error("configured end snapshot does not exist: '%s'"),
toSnapshotId);
}

if (getPollInterval() != null) {
checkArgument(
Boolean.TRUE.equals(getStreaming()),
error("'poll_interval_seconds' can only be set when streaming is true"));
}

@Nullable String watermarkColumn = getWatermarkColumn();
if (watermarkColumn != null) {
checkArgument(getUseCdc(), error("'watermark_column' is only supported in CDC mode"));
NestedField field = table.schema().findField(watermarkColumn);
checkArgument(
field != null, error("'watermark_column' refers to unknown column: %s"), watermarkColumn);
checkArgument(
field.isRequired(),
error("'watermark_column' needs to be a non-nullable column: %s"),
watermarkColumn);
checkArgument(
field.type().typeId() == TIMESTAMP || field.type().typeId() == LONG,
error("'watermark_column' must be a timestamp-typed column, but '%s' has type %s"),
watermarkColumn,
field.type().typeId());
checkArgumentNotNull(getProjectedSchema().findField(watermarkColumn), "'watermark_column' column should not be dropped.");
}
}

private String error(String message) {
Expand Down
Loading
Loading