diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java index 04e745eaaf1..93d03272712 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java @@ -46,6 +46,8 @@ public class LocalAuthenticationMetadataProvider implements AuthenticationMetada private LoadingCache userCache; + protected ThreadPoolExecutor cacheRefreshExecutor; + @Override public void initialize(AuthConfig authConfig, Supplier metadataService) { this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false); @@ -53,7 +55,7 @@ public void initialize(AuthConfig authConfig, Supplier metadataService) { throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied"); } - ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( + this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( 1, 1, 1000 * 60, @@ -144,6 +146,9 @@ public void shutdown() { if (this.storage != null) { this.storage.shutdown(); } + if (this.cacheRefreshExecutor != null) { + this.cacheRefreshExecutor.shutdown(); + } } private static class UserCacheLoader implements CacheLoader { diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java index 6db999bee70..f6b8ecaf3db 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java @@ -51,13 +51,15 @@ public class LocalAuthorizationMetadataProvider implements AuthorizationMetadata private LoadingCache aclCache; + protected ThreadPoolExecutor cacheRefreshExecutor; + @Override public void initialize(AuthConfig authConfig, Supplier metadataService) { this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false); if (!this.storage.start()) { throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied."); } - ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( + this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( 1, 1, 1000 * 60, @@ -172,6 +174,9 @@ public void shutdown() { if (this.storage != null) { this.storage.shutdown(); } + if (this.cacheRefreshExecutor != null) { + this.cacheRefreshExecutor.shutdown(); + } } private static class AclCacheLoader implements CacheLoader { diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java new file mode 100644 index 00000000000..15ec8c32603 --- /dev/null +++ b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.auth.authentication.provider; + +import org.apache.rocketmq.auth.config.AuthConfig; +import org.apache.rocketmq.auth.helper.AuthTestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class LocalAuthenticationMetadataProviderTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testShutdownReleasesCacheExecutor() throws Exception { + AuthConfig authConfig = AuthTestHelper.createDefaultConfig(); + authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath()); + + LocalAuthenticationMetadataProvider provider = new LocalAuthenticationMetadataProvider(); + // Initialize provider to create the internal cache refresh executor + provider.initialize(authConfig, () -> null); + + // After initialization, the executor should exist and not be shutdown + Assert.assertNotNull(provider.cacheRefreshExecutor); + Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown()); + + // Shutdown provider should also shutdown its executor to release resources + provider.shutdown(); + + // Verify that the cache refresh executor has been shutdown + Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown()); + } +} diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java new file mode 100644 index 00000000000..32771a4d80c --- /dev/null +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.auth.authorization.provider; + +import org.apache.rocketmq.auth.config.AuthConfig; +import org.apache.rocketmq.auth.helper.AuthTestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class LocalAuthorizationMetadataProviderTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testShutdownReleasesCacheExecutor() throws Exception { + AuthConfig authConfig = AuthTestHelper.createDefaultConfig(); + authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath()); + + LocalAuthorizationMetadataProvider provider = new LocalAuthorizationMetadataProvider(); + // Initialize provider to create the internal cache refresh executor + provider.initialize(authConfig, () -> null); + + // After initialization, the executor should exist and not be shutdown + Assert.assertNotNull(provider.cacheRefreshExecutor); + Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown()); + + // Shutdown provider should also shutdown its executor to release resources + provider.shutdown(); + + // Verify that the cache refresh executor has been shutdown + Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown()); + } +}