From f71afac61163ac7777706866d1f1fc2f346116f2 Mon Sep 17 00:00:00 2001 From: majialong Date: Sun, 23 Nov 2025 14:24:51 +0800 Subject: [PATCH 1/2] [ISSUE #9870] Ensure metadata provider cache executors are shutdown correctly --- .../LocalAuthenticationMetadataProvider.java | 7 ++- .../LocalAuthorizationMetadataProvider.java | 7 ++- ...calAuthenticationMetadataProviderTest.java | 50 +++++++++++++++++++ ...ocalAuthorizationMetadataProviderTest.java | 50 +++++++++++++++++++ 4 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java create mode 100644 auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java index 04e745eaaf1..93d03272712 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java @@ -46,6 +46,8 @@ public class LocalAuthenticationMetadataProvider implements AuthenticationMetada private LoadingCache userCache; + protected ThreadPoolExecutor cacheRefreshExecutor; + @Override public void initialize(AuthConfig authConfig, Supplier metadataService) { this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false); @@ -53,7 +55,7 @@ public void initialize(AuthConfig authConfig, Supplier metadataService) { throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied"); } - ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( + this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( 1, 1, 1000 * 60, @@ -144,6 +146,9 @@ public void shutdown() { if (this.storage != null) { this.storage.shutdown(); } + if (this.cacheRefreshExecutor != null) { + this.cacheRefreshExecutor.shutdown(); + } } private static class UserCacheLoader implements CacheLoader { diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java index 6db999bee70..f6b8ecaf3db 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java @@ -51,13 +51,15 @@ public class LocalAuthorizationMetadataProvider implements AuthorizationMetadata private LoadingCache aclCache; + protected ThreadPoolExecutor cacheRefreshExecutor; + @Override public void initialize(AuthConfig authConfig, Supplier metadataService) { this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false); if (!this.storage.start()) { throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied."); } - ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( + this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( 1, 1, 1000 * 60, @@ -172,6 +174,9 @@ public void shutdown() { if (this.storage != null) { this.storage.shutdown(); } + if (this.cacheRefreshExecutor != null) { + this.cacheRefreshExecutor.shutdown(); + } } private static class AclCacheLoader implements CacheLoader { diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java new file mode 100644 index 00000000000..5ff865cac74 --- /dev/null +++ b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.auth.authentication.provider; + +import org.apache.rocketmq.auth.config.AuthConfig; +import org.apache.rocketmq.auth.helper.AuthTestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class LocalAuthenticationMetadataProviderTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testShutdownReleasesCacheExecutor() throws Exception { + AuthConfig authConfig = AuthTestHelper.createDefaultConfig(); + authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath()); + + LocalAuthenticationMetadataProvider provider = new LocalAuthenticationMetadataProvider(); + // Initialize provider to create the internal cache refresh executor + provider.initialize(authConfig, () -> null); + + // After initialize, the executor should exist and not be shutdown + Assert.assertNotNull(provider.cacheRefreshExecutor); + Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown()); + + // Shutdown provider should also shutdown its executor and free resources + provider.shutdown(); + + // Verify that the cache refresh executor has been shutdown + Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown()); + } +} diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java new file mode 100644 index 00000000000..fe21a8ba974 --- /dev/null +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.auth.authorization.provider; + +import org.apache.rocketmq.auth.config.AuthConfig; +import org.apache.rocketmq.auth.helper.AuthTestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class LocalAuthorizationMetadataProviderTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testShutdownReleasesCacheExecutor() throws Exception { + AuthConfig authConfig = AuthTestHelper.createDefaultConfig(); + authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath()); + + LocalAuthorizationMetadataProvider provider = new LocalAuthorizationMetadataProvider(); + // Initialize provider so that the internal cache refresh executor is created + provider.initialize(authConfig, () -> null); + + // Verify that the scheduled executor is created and still running after initialize + Assert.assertNotNull(provider.cacheRefreshExecutor); + Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown()); + + // When provider is shutdown, the executor should also be terminated to avoid thread leak + provider.shutdown(); + + // Verify that the cache refresh executor is properly shutdown + Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown()); + } +} From a22ea6a98adcbac6b4a698710d9f9c551a775bb9 Mon Sep 17 00:00:00 2001 From: majialong Date: Sun, 23 Nov 2025 16:33:37 +0800 Subject: [PATCH 2/2] Improve test comments --- .../provider/LocalAuthenticationMetadataProviderTest.java | 4 ++-- .../provider/LocalAuthorizationMetadataProviderTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java index 5ff865cac74..15ec8c32603 100644 --- a/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java +++ b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java @@ -37,11 +37,11 @@ public void testShutdownReleasesCacheExecutor() throws Exception { // Initialize provider to create the internal cache refresh executor provider.initialize(authConfig, () -> null); - // After initialize, the executor should exist and not be shutdown + // After initialization, the executor should exist and not be shutdown Assert.assertNotNull(provider.cacheRefreshExecutor); Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown()); - // Shutdown provider should also shutdown its executor and free resources + // Shutdown provider should also shutdown its executor to release resources provider.shutdown(); // Verify that the cache refresh executor has been shutdown diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java index fe21a8ba974..32771a4d80c 100644 --- a/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java @@ -34,17 +34,17 @@ public void testShutdownReleasesCacheExecutor() throws Exception { authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath()); LocalAuthorizationMetadataProvider provider = new LocalAuthorizationMetadataProvider(); - // Initialize provider so that the internal cache refresh executor is created + // Initialize provider to create the internal cache refresh executor provider.initialize(authConfig, () -> null); - // Verify that the scheduled executor is created and still running after initialize + // After initialization, the executor should exist and not be shutdown Assert.assertNotNull(provider.cacheRefreshExecutor); Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown()); - // When provider is shutdown, the executor should also be terminated to avoid thread leak + // Shutdown provider should also shutdown its executor to release resources provider.shutdown(); - // Verify that the cache refresh executor is properly shutdown + // Verify that the cache refresh executor has been shutdown Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown()); } }