diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 53c69fbc0d..cfbcccfdc5 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -95,11 +95,15 @@ public class FlinkConnectorOptions { public static final ConfigOption SCAN_PARTITION_DISCOVERY_INTERVAL = ConfigOptions.key("scan.partition.discovery.interval") .durationType() - .defaultValue(Duration.ofSeconds(10)) + .defaultValue(Duration.ofMinutes(1)) .withDescription( "The time interval for the Fluss source to discover " + "the new partitions for partitioned table while scanning." - + " A non-positive value disables the partition discovery."); + + " A non-positive value disables the partition discovery. The default value is 1 " + + "minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large " + + "number of requests to ZooKeeper in server, this option cannot be set too small, " + + "as a small value would cause frequent requests and increase server load. In the future, " + + "once list partitions is optimized, the default value of this parameter can be reduced."); public static final ConfigOption SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore-delete") diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 54fb906ae9..b01a7e4b20 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -566,8 +566,15 @@ void testWritePartitionedTable(boolean isPrimaryKeyTable, boolean isAutoPartitio tableName, String.join(", ", insertValues))) .await(); + // This test requires dynamically discovering newly created partitions, so + // 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute), + // otherwise the test may hang for 1 minute. CloseableIterator rowIter = - tEnv.executeSql(String.format("select * from %s", tableName)).collect(); + tEnv.executeSql( + String.format( + "select * from %s /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */", + tableName)) + .collect(); assertResultsIgnoreOrder(rowIter, expectedRows, false); // create two partitions, write data to the new partitions diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index f9577af974..d765421bc0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -607,8 +607,15 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio writeRowsToPartition(conn, tablePath, partitionNameById.values()); waitUntilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values()); + // This test requires dynamically discovering newly created partitions, so + // 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute), + // otherwise the test may hang for 1 minute. org.apache.flink.util.CloseableIterator rowIter = - tEnv.executeSql(String.format("select * from %s", tableName)).collect(); + tEnv.executeSql( + String.format( + "select * from %s /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */", + tableName)) + .collect(); assertResultsIgnoreOrder(rowIter, expectedRowValues, false); // then create some new partitions, and write rows to the new partitions @@ -1047,8 +1054,13 @@ void testStreamingReadMultiPartitionPushDown() throws Exception { + "project=[a, b, d]]], fields=[a, b, d])"); // test partition key prefix match + // This test requires dynamically discovering newly created partitions, so + // 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute), + // otherwise the test may hang for 1 minute. org.apache.flink.util.CloseableIterator rowIter = - tEnv.executeSql("select * from multi_partitioned_table where c ='2025'").collect(); + tEnv.executeSql( + "select * from multi_partitioned_table /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */ where c ='2025'") + .collect(); assertResultsIgnoreOrder(rowIter, expectedRowValues, false); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java index 9553e70a04..5091d29d4a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java @@ -192,7 +192,7 @@ public void testMissingScanPartitionDiscoveryInterval() { .build(); // Then - assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(10000L); + assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(60000L); } @Test diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 8eacd0cfd8..d53a468e52 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -86,20 +86,20 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); ## Read Options -| Option | Type | Default | Description | -|-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| scan.startup.mode | Enum | full | The scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following `scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See the [Start Reading Position](engine-flink/reads.md#start-reading-position) for more details. | -| scan.startup.timestamp | Long | (None) | The timestamp to start reading the data from. This option is only valid when `scan.startup.mode` is set to `timestamp`. The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like `1678883047356` or `2023-12-09 23:09:12`. | -| scan.partition.discovery.interval | Duration | 10s | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. | -| client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | -| client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | -| client.scanner.log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. | -| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data the server should return for a table bucket in fetch request fom client. Records are fetched in batches, and the max bytes size is config by this option. | -| client.scanner.log.fetch.min-bytes | MemorySize | 1b | The minimum bytes expected for each fetch log request from client to response. If not enough bytes, wait up to client.scanner.log.fetch-wait-max-time time to return. | -| client.scanner.log.fetch.wait-max-time | Duration | 500ms | The maximum time to wait for enough bytes to be available for a fetch log request from client to response. | -| client.scanner.io.tmpdir | String | System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | -| client.scanner.remote-log.prefetch-num | Integer | 4 | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. | -| client.remote-file.download-thread-num | Integer | 3 | The number of threads the client uses to download remote files. | +| Option | Type | Default | Description | +|-----------------------------------------------------|------------|-------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| scan.startup.mode | Enum | full | The scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following `scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See the [Start Reading Position](engine-flink/reads.md#start-reading-position) for more details. | +| scan.startup.timestamp | Long | (None) | The timestamp to start reading the data from. This option is only valid when `scan.startup.mode` is set to `timestamp`. The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like `1678883047356` or `2023-12-09 23:09:12`. | +| scan.partition.discovery.interval | Duration | 1min | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. The default value is 1 minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large number of requests to ZooKeeper in server, this option cannot be set too small, as a small value would cause frequent requests and increase server load. In the future, once list partitions is optimized, the default value of this parameter can be reduced. | +| client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | +| client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | +| client.scanner.log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. | +| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data the server should return for a table bucket in fetch request fom client. Records are fetched in batches, and the max bytes size is config by this option. | +| client.scanner.log.fetch.min-bytes | MemorySize | 1b | The minimum bytes expected for each fetch log request from client to response. If not enough bytes, wait up to client.scanner.log.fetch-wait-max-time time to return. | +| client.scanner.log.fetch.wait-max-time | Duration | 500ms | The maximum time to wait for enough bytes to be available for a fetch log request from client to response. | +| client.scanner.io.tmpdir | String | System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | +| client.scanner.remote-log.prefetch-num | Integer | 4 | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. | +| client.remote-file.download-thread-num | Integer | 3 | The number of threads the client uses to download remote files. | ## Lookup Options