Skip to content

Commit

Permalink
Backporting Flink: Watermark Read Options to 1.17 and 1.16 (apache#9456)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodmeneses authored and adnanhemani committed Jan 30, 2024
1 parent 42603f0 commit 0cef742
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}

public String watermarkColumn() {
return confParser
.stringConf()
.option(FlinkReadOptions.WATERMARK_COLUMN)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
.parseOptional();
}

public TimeUnit watermarkColumnTimeUnit() {
return confParser
.enumConfParser(TimeUnit.class)
.option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;

import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -109,4 +110,14 @@ private FlinkReadOptions() {}
public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);

public static final String WATERMARK_COLUMN = "watermark-column";
public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue();

public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit";
public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
.enumType(TimeUnit.class)
.defaultValue(TimeUnit.MICROSECONDS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
Expand Down Expand Up @@ -219,8 +220,6 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -242,9 +241,6 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -441,7 +437,7 @@ public Builder<T> setAll(Map<String, String> properties) {
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
* setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
Expand All @@ -450,7 +446,7 @@ public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}

Expand All @@ -459,8 +455,8 @@ public Builder<T> watermarkColumn(String columnName) {
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name());
return this;
}

Expand All @@ -482,13 +478,16 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);

Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
String watermarkColumn = flinkReadConf.watermarkColumn();
TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();

if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
private final String watermarkColumn;
private final TimeUnit watermarkColumnTimeUnit;

private ScanContext(
boolean caseSensitive,
Expand All @@ -91,6 +94,8 @@ private ScanContext(
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String watermarkColumn,
TimeUnit watermarkColumnTimeUnit,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -122,6 +127,8 @@ private ScanContext(
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
this.watermarkColumn = watermarkColumn;
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;

validate();
}
Expand Down Expand Up @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public String watermarkColumn() {
return watermarkColumn;
}

public TimeUnit watermarkColumnTimeUnit() {
return watermarkColumnTimeUnit;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -298,6 +313,8 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}

Expand Down Expand Up @@ -327,6 +344,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}

Expand Down Expand Up @@ -367,6 +386,9 @@ public static class Builder {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
private TimeUnit watermarkColumnTimeUnit =
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -500,6 +522,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
return this;
}

public Builder watermarkColumn(String newWatermarkColumn) {
this.watermarkColumn = newWatermarkColumn;
return this;
}

public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -525,7 +557,9 @@ public Builder resolveConfig(
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(flinkReadConf.watermarkColumn())
.watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
}

public ScanContext build() {
Expand All @@ -552,6 +586,8 @@ public ScanContext build() {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
watermarkColumn,
watermarkColumnTimeUnit,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r
.collect(Collectors.toList());
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
private static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
Expand All @@ -135,6 +135,17 @@ public static void assertRecords(List<Row> results, List<Record> expectedRecords
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
return expected;
}

public static void assertRecordsWithOrder(
List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRowsWithOrder(results, expected);
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRows(results, expected);
}

Expand All @@ -146,6 +157,10 @@ public static void assertRows(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}

public static void assertRowsWithOrder(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyElementsOf(expected);
}

public static void assertRowData(Schema schema, StructLike expected, RowData actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual);
}
Expand Down
Loading

0 comments on commit 0cef742

Please sign in to comment.