Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ public class FlinkConnectorOptions {
public static final ConfigOption<Duration> SCAN_PARTITION_DISCOVERY_INTERVAL =
ConfigOptions.key("scan.partition.discovery.interval")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.defaultValue(Duration.ofMinutes(1))
.withDescription(
"The time interval for the Fluss source to discover "
+ "the new partitions for partitioned table while scanning."
+ " A non-positive value disables the partition discovery.");
+ " A non-positive value disables the partition discovery. The default value is 1 "
+ "minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large "
+ "number of requests to ZooKeeper in server, this option cannot be set too small, "
+ "as a small value would cause frequent requests and increase server load. In the future, "
+ "once list partitions is optimized, the default value of this parameter can be reduced.");

public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
ConfigOptions.key("sink.ignore-delete")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,15 @@ void testWritePartitionedTable(boolean isPrimaryKeyTable, boolean isAutoPartitio
tableName, String.join(", ", insertValues)))
.await();

// This test requires dynamically discovering newly created partitions, so
// 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute),
// otherwise the test may hang for 1 minute.
CloseableIterator<Row> rowIter =
tEnv.executeSql(String.format("select * from %s", tableName)).collect();
tEnv.executeSql(
String.format(
"select * from %s /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */",
tableName))
.collect();
assertResultsIgnoreOrder(rowIter, expectedRows, false);

// create two partitions, write data to the new partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,15 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio
writeRowsToPartition(conn, tablePath, partitionNameById.values());
waitUntilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values());

// This test requires dynamically discovering newly created partitions, so
// 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute),
// otherwise the test may hang for 1 minute.
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(String.format("select * from %s", tableName)).collect();
tEnv.executeSql(
String.format(
"select * from %s /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */",
tableName))
.collect();
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);

// then create some new partitions, and write rows to the new partitions
Expand Down Expand Up @@ -1047,8 +1054,13 @@ void testStreamingReadMultiPartitionPushDown() throws Exception {
+ "project=[a, b, d]]], fields=[a, b, d])");

// test partition key prefix match
// This test requires dynamically discovering newly created partitions, so
// 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute),
// otherwise the test may hang for 1 minute.
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from multi_partitioned_table where c ='2025'").collect();
tEnv.executeSql(
"select * from multi_partitioned_table /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */ where c ='2025'")
.collect();

assertResultsIgnoreOrder(rowIter, expectedRowValues, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testMissingScanPartitionDiscoveryInterval() {
.build();

// Then
assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(10000L);
assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(60000L);
}

@Test
Expand Down
Loading