Skip to content

Flink: backport split discovery throttling for FLIP-27 source to 1.14…#6363

Merged
stevenzwu merged 1 commit intoapache:masterfrom
stevenzwu:backport-pr-6299
Dec 6, 2022
Merged

Flink: backport split discovery throttling for FLIP-27 source to 1.14…#6363
stevenzwu merged 1 commit intoapache:masterfrom
stevenzwu:backport-pr-6299

Conversation

@stevenzwu
Copy link
Contributor

… and 1.15

@stevenzwu stevenzwu requested a review from pvary December 5, 2022 22:59
@github-actions github-actions bot added the flink label Dec 5, 2022
@pvary
Copy link
Contributor

pvary commented Dec 6, 2022

@stevenzwu: Any specific change (non-clean backport) to double check?

@stevenzwu
Copy link
Contributor Author

stevenzwu commented Dec 6, 2022

no diff btw 1.15 and 1.16

➜  iceberg git:(backport-pr-6299) ✗ git diff --no-index  flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/
➜  iceberg git:(backport-pr-6299) ✗

diff btw 1.14 and 1.16 source. One is caused by table API change in 1.15. Another diff looks like some whitespace/tab differences

➜  iceberg git:(backport-pr-6299) ✗ git diff --no-index  flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/

--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -200,7 +201,8 @@ public class IcebergTableSource
   public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
     return new DataStreamScanProvider() {
       @Override
-      public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
+      public DataStream<RowData> produceDataStream(
+          ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
         if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) {
           return createFLIP27Stream(execEnv);
         } else {
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index f57f1f5fd..1d99e441b 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -96,7 +96,6 @@ public class ScanContext implements Serializable {
   private final Long splitOpenFileCost;
   private final boolean isStreaming;
   private final Duration monitorInterval;
-  private final int maxPlanningSnapshotCount;

   private final String nameMapping;
   private final Schema schema;
@@ -104,6 +103,7 @@ public class ScanContext implements Serializable {
   private final long limit;
   private final boolean includeColumnStats;
   private final Integer planParallelism;
+  private final int maxPlanningSnapshotCount;

   private ScanContext(
       boolean caseSensitive,
@@ -138,7 +138,6 @@ public class ScanContext implements Serializable {
     this.splitOpenFileCost = splitOpenFileCost;
     this.isStreaming = isStreaming;
     this.monitorInterval = monitorInterval;
-    this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;

     this.nameMapping = nameMapping;
     this.schema = schema;
@@ -147,6 +146,7 @@ public class ScanContext implements Serializable {
     this.includeColumnStats = includeColumnStats;
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
+    this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;

     validate();
   }

@stevenzwu
Copy link
Contributor Author

there is no diff btw 1.14/1.15 and 1.16 for the test dir

➜  iceberg git:(backport-pr-6299) ✗ git diff --no-index  flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator

@pvary
Copy link
Contributor

pvary commented Dec 6, 2022

Thanks @stevenzwu for the backport and the detailed diff!

@stevenzwu stevenzwu merged commit b176202 into apache:master Dec 6, 2022
@stevenzwu
Copy link
Contributor Author

thanks @pvary for reviewing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants