diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java index 676f7f4435d..2cce35fb9ba 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java @@ -34,6 +34,7 @@ public class IcebergConstant { public static final String WAREHOUSE_KEY = "warehouse"; public static final String START_SNAPSHOT_ID = "start-snapshot-id"; public static final String STREAMING = "streaming"; + public static final String STARTING_STRATEGY_KEY = "starting-strategy"; /** * Iceberg supported catalog type @@ -65,4 +66,13 @@ public static CatalogType forName(String name) { throw new IllegalArgumentException(String.format("Unsupport catalogType:%s", name)); } } + + public enum StreamingStartingStrategy { + TABLE_SCAN_THEN_INCREMENTAL, + INCREMENTAL_FROM_LATEST_SNAPSHOT, + INCREMENTAL_FROM_EARLIEST_SNAPSHOT, + INCREMENTAL_FROM_SNAPSHOT_ID, + INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP; + + } } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java index 14fec78da81..c48899b8871 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java @@ -110,7 +110,10 @@ public Map tableOptions() { options.put(IcebergConstant.TABLE_KEY, tableName); options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName); + // support streaming only options.put(IcebergConstant.STREAMING, "true"); + options.put(IcebergConstant.STARTING_STRATEGY_KEY, + IcebergConstant.StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT.name()); if (null != uri) { options.put(IcebergConstant.URI_KEY, uri); } @@ -119,6 +122,8 @@ public Map tableOptions() { } if (null != startSnapShotId) { options.put(IcebergConstant.START_SNAPSHOT_ID, startSnapShotId.toString()); + options.put(IcebergConstant.STARTING_STRATEGY_KEY, + IcebergConstant.StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID.name()); } return options; }