Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backporting Flink: Watermark Read Options to 1.17 and 1.16 #9456

Merged
merged 1 commit into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}

public String watermarkColumn() {
return confParser
.stringConf()
.option(FlinkReadOptions.WATERMARK_COLUMN)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
.parseOptional();
}

public TimeUnit watermarkColumnTimeUnit() {
return confParser
.enumConfParser(TimeUnit.class)
.option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;

import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -109,4 +110,14 @@ private FlinkReadOptions() {}
public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);

public static final String WATERMARK_COLUMN = "watermark-column";
public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue();

public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit";
public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
.enumType(TimeUnit.class)
.defaultValue(TimeUnit.MICROSECONDS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
Expand Down Expand Up @@ -219,8 +220,6 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -242,9 +241,6 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -441,7 +437,7 @@ public Builder<T> setAll(Map<String, String> properties) {
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
* setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
Expand All @@ -450,7 +446,7 @@ public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}

Expand All @@ -459,8 +455,8 @@ public Builder<T> watermarkColumn(String columnName) {
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name());
return this;
}

Expand All @@ -482,13 +478,16 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);

Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
String watermarkColumn = flinkReadConf.watermarkColumn();
TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();

if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
private final String watermarkColumn;
private final TimeUnit watermarkColumnTimeUnit;

private ScanContext(
boolean caseSensitive,
Expand All @@ -91,6 +94,8 @@ private ScanContext(
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String watermarkColumn,
TimeUnit watermarkColumnTimeUnit,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -122,6 +127,8 @@ private ScanContext(
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
this.watermarkColumn = watermarkColumn;
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;

validate();
}
Expand Down Expand Up @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public String watermarkColumn() {
return watermarkColumn;
}

public TimeUnit watermarkColumnTimeUnit() {
return watermarkColumnTimeUnit;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -298,6 +313,8 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}

Expand Down Expand Up @@ -327,6 +344,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}

Expand Down Expand Up @@ -367,6 +386,9 @@ public static class Builder {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
private TimeUnit watermarkColumnTimeUnit =
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -500,6 +522,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
return this;
}

public Builder watermarkColumn(String newWatermarkColumn) {
this.watermarkColumn = newWatermarkColumn;
return this;
}

public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -525,7 +557,9 @@ public Builder resolveConfig(
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(flinkReadConf.watermarkColumn())
.watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
}

public ScanContext build() {
Expand All @@ -552,6 +586,8 @@ public ScanContext build() {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
watermarkColumn,
watermarkColumnTimeUnit,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r
.collect(Collectors.toList());
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
private static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
Expand All @@ -135,6 +135,17 @@ public static void assertRecords(List<Row> results, List<Record> expectedRecords
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
return expected;
}

public static void assertRecordsWithOrder(
List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRowsWithOrder(results, expected);
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRows(results, expected);
}

Expand All @@ -146,6 +157,10 @@ public static void assertRows(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}

public static void assertRowsWithOrder(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyElementsOf(expected);
}

public static void assertRowData(Schema schema, StructLike expected, RowData actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual);
}
Expand Down
Loading