Skip to content

Flink: Compact in sink v2 support Coordinator Lock#15459

Open
Guosmilesmile wants to merge 4 commits intoapache:mainfrom
Guosmilesmile:table-maintance-sink-coordinator
Open

Flink: Compact in sink v2 support Coordinator Lock#15459
Guosmilesmile wants to merge 4 commits intoapache:mainfrom
Guosmilesmile:table-maintance-sink-coordinator

Conversation

@Guosmilesmile
Copy link
Contributor

We already support the Coordinator Lock, but it hasn’t been introduced into Sink Table Maintenance yet. This PR adds support for configuring the Coordinator Lock in Sink Table Maintenance.

Comment on lines +255 to +258
Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't release lock");
if (lockTime == null) {
LOG.warn("Lock time is null, Can't release lock");
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we meet max watermark case, this will error, so open a new pr to deal with it .
#15458

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment on that PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, I have change the way

@Guosmilesmile Guosmilesmile force-pushed the table-maintance-sink-coordinator branch from 9e40643 to b1262dd Compare February 27, 2026 09:08
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Guosmilesmile!

env.execute("Table Maintenance Job");
```

Use Coordinator Lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Use Coordinator Lock
#### Managing table locking via Flink

Comment on lines +177 to +201
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.hive("my_catalog", configuration, properties),
TableIdentifier.of("database", "table")
);

TableMaintenance.forTable(env, tableLoader)
.uidSuffix("my-maintenance-job")
.rateLimit(Duration.ofMinutes(10))
.lockCheckDelay(Duration.ofSeconds(10))
.add(ExpireSnapshots.builder()
.scheduleOnCommitCount(10)
.maxSnapshotAge(Duration.ofMinutes(10))
.retainLast(5)
.deleteBatchSize(5)
.parallelism(8))
.add(RewriteDataFiles.builder()
.scheduleOnDataFileCount(10)
.targetFileSizeBytes(128 * 1024 * 1024)
.partialProgressEnabled(true)
.partialProgressMaxCommits(10))
.append();

env.execute("Table Maintenance Job");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything except line 184 is identical (no lock parameter). Maybe consolidate the two sections and just explain that the builder can either be

TableMaintenance.forTable(env, tableLoader, lockFactory)

or

TableMaintenance.forTable(env, tableLoader)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I combine two part code

Comment on lines +255 to +258
Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't release lock");
if (lockTime == null) {
LOG.warn("Lock time is null, Can't release lock");
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment on that PR.


| Key | Description | Default |
|-----|----------------------|---------|
| `flink-maintenance.lock.type` | Set to `` or not set | |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have this default to flink? (which will be using the coordinator)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the coordinator lock will become the default later and the other lock options are removed, I’d prefer not to require users to set this parameter here, so it’s easier and more consistent with their future usage habits.

@Guosmilesmile Guosmilesmile force-pushed the table-maintance-sink-coordinator branch 2 times, most recently from e15c09d to 3ae3a07 Compare March 5, 2026 05:26
Comment on lines +108 to +111
if (!Watermark.MAX_WATERMARK.equals(mark)) {
operatorEventGateway.sendEventToCoordinator(
new LockReleaseEvent(tableName, mark.getTimestamp()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's compare the timestamp, not the object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,make it compare with timestamp.

}

@Test
@TestTemplate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of my PRs it was suggested not to use TestTemplate as it was only introduced for backward compatibility reasons.

I was pointed to use @ParameterizedTest for every test instead:

  @ParameterizedTest
  @FieldSource("FILE_FORMATS")

+ "'flink-maintenance.rewrite.schedule.data-file-size'='1',"
+ "'flink-maintenance.lock-check-delay-seconds'='60'";
private static final String TABLE_PROPERTIES_COORDINATOR =
"'flink-maintenance.lock.jdbc.init-lock-table'='true',"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?


#### Flink-maintained lock

Maintain the lock within Flink itself. This does not require configuring external systems. One prerequisite is that there are no parallel table maintenance jobs for a given table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be?

The only prerequisite is that there are no parallel table maintenance jobs for a given table.

jdbcProps // JDBC connection properties
);

// Option 1: With external lock factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention that we plan to deprecate Option 1?

Do we plan to do it?

return Arrays.asList(
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), true},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), false});
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), true, "jdbc"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be LockConfig.JdbcLockConfig.JDBC?

sql(
"INSERT INTO %s /*+ OPTIONS(%s) */ SELECT id,data from sourceTable",
TABLE_NAME, TABLE_PROPERTIES);
if (lockType.equals("jdbc")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be: LockConfig.JdbcLockConfig.JDBC?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants