Skip to content

Flink: Close catalog in dynamic sink operators to prevent resource leaks#16044

Open
donPain wants to merge 6 commits into
apache:mainfrom
donPain:main
Open

Flink: Close catalog in dynamic sink operators to prevent resource leaks#16044
donPain wants to merge 6 commits into
apache:mainfrom
donPain:main

Conversation

@donPain
Copy link
Copy Markdown

@donPain donPain commented Apr 19, 2026

Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator, DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache) load a catalog via CatalogLoader but never close it. When the catalog is a RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and other resources held by the catalog's CloseableGroup.

Fix each operator to close its catalog on shutdown if it implements Closeable. For TableSerializerCache, cache the catalog instance to avoid creating a new (leaked) catalog on every serializer update.

Add tests for operator lifecycle with both Closeable and non-Closeable catalogs, and an integration test with RESTCatalog + S3FileIO.

Warning exception trace:
image

@donPain donPain force-pushed the main branch 3 times, most recently from 2f9bee7 to e7dedc6 Compare April 20, 2026 18:42
Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator,
DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache)
load a catalog via CatalogLoader but never close it. When the catalog is a
RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and
other resources held by the catalog's CloseableGroup.

Fix each operator to close its catalog on shutdown if it implements
Closeable. For TableSerializerCache, cache the catalog instance to avoid
creating a new (leaked) catalog on every serializer update.

Add tests for operator lifecycle with both Closeable and non-Closeable
catalogs, and an integration test with RESTCatalog + S3FileIO.
@donPain
Copy link
Copy Markdown
Author

donPain commented Apr 20, 2026

Hey @nastra could you take a look at this PR when you have a moment?

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Apr 21, 2026

CC: @mxm, @Guosmilesmile

Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @donPain!

Comment thread flink/v2.1/build.gradle
Comment on lines +77 to +85
testImplementation project(':iceberg-aws')
testImplementation(platform(libs.awssdk.bom))
testImplementation "software.amazon.awssdk:s3"
testImplementation "software.amazon.awssdk:auth"
testImplementation "software.amazon.awssdk:kms"
testImplementation "software.amazon.awssdk:sts"
testImplementation libs.testcontainers
testImplementation libs.testcontainers.junit.jupiter
testImplementation libs.testcontainers.minio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need all these dependencies to verify the fix?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my production scenario, I was using S3Tables, so I needed to simulate at least one object-store with a "rest" catalog

Comment on lines +125 to +127
if (catalog == null) {
catalog = catalogLoader.loadCatalog();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we closing this catalog?

Comment on lines +142 to +169
@Test
void testCloseOperatorWithNonCloseableCatalog() throws Exception {
DynamicTableUpdateOperator operator =
new DynamicTableUpdateOperator(
CATALOG_EXTENSION.catalogLoader(),
10,
1000,
10,
TableCreator.DEFAULT,
CASE_SENSITIVE,
PRESERVE_COLUMNS);
operator.open(null);

DynamicRecordInternal input =
new DynamicRecordInternal(
TABLE,
"branch",
SCHEMA1,
GenericRowData.of(1),
PartitionSpec.unpartitioned(),
42,
false,
Collections.emptySet());
operator.map(input);

// HadoopCatalog is not Closeable, so close should complete without error
operator.close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need this test. There won't be a way to call close on a catalog which doesn't implement Closeable.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added that to validate local tests and forgot to remove it, good point.

Comment on lines +227 to +229
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same.

Comment on lines +202 to +206
try {
((Closeable) catalog).close();
} catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we standardize on a single way of writing this? Found different style in 1.20 vs 2.1

if (catalog instanceof Closeable rs) 

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, of course. What do you think about changing the runtime exception to a more specific exception?

@donPain donPain requested review from Guosmilesmile and mxm April 22, 2026 21:30
schemas = table.schemas();
specs = table.specs();
} finally {
closeCatalog(loadedCatalog);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a different pattern than in the other classes, i.e. we are closing immediately after using the catalog. Should we also follow that approach elsewhere? Do we need a cache to avoid the overhead of opening / closing?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable trade-off here. TableSerializerCache is used from a TypeSerializer, which doesn't really give us a clean lifecycle to keep a Catalog open and close it later. Because of that, closing the catalog right after the
lookup feels safer.

We still have caching where it matters: once the schema/spec ids are resolved, the schemas, specs, and RowDataSerializer are cached, so we don't open/close the catalog on the normal hot path. Adding a catalog cache here would only help on cache misses, but it would also make resource cleanup harder and increase the risk of leaks.

# Conflicts:
#	flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
#	flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 4, 2026

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Jun 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants