From 6ab0ba7b8c234d8eb1eca1e1ba1c74e5eeb59878 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 8 Mar 2026 16:42:27 +0800 Subject: [PATCH] HDDS-14356. Support OM Service Framework Generated-by: Codex (GPT-5.5) --- .../apache/hadoop/ozone/om/OzoneManager.java | 11 ++ .../apache/hadoop/ozone/om/ha/OMService.java | 61 +++++++++++ .../ozone/om/ha/OMServiceException.java | 40 +++++++ .../hadoop/ozone/om/ha/OMServiceManager.java | 77 +++++++++++++ .../om/ratis/OzoneManagerStateMachine.java | 4 + .../ozone/om/ha/TestOMServiceManager.java | 103 ++++++++++++++++++ .../ratis/TestOzoneManagerStateMachine.java | 5 + 7 files changed, 301 insertions(+) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMService.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceException.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceManager.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMServiceManager.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index d258324b3bb0..06533dfd207d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -248,6 +248,7 @@ import org.apache.hadoop.ozone.om.execution.OMExecutionFlow; import org.apache.hadoop.ozone.om.ha.OMHAMetrics; import org.apache.hadoop.ozone.om.ha.OMHANodeDetails; +import org.apache.hadoop.ozone.om.ha.OMServiceManager; import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.DBUpdates; @@ -515,6 +516,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private OmSnapshotManager omSnapshotManager; private volatile DirectoryDeletingService dirDeletingService; + private final OMServiceManager serviceManager; + @SuppressWarnings("methodlength") private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) throws IOException, AuthenticationException { @@ -712,6 +715,9 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) readBlacklist = OzoneBlacklist.getReadonlyBlacklist(conf); s3OzoneAdmins = OzoneAdmins.getS3Admins(conf); + + serviceManager = new OMServiceManager(); + instantiateServices(false); // Create special volume s3v which is required for S3G. @@ -2459,6 +2465,7 @@ public boolean stop() { if (omRatisSnapshotProvider != null) { omRatisSnapshotProvider.close(); } + serviceManager.stop(); DeletingServiceMetrics.unregister(); OMPerformanceMetrics.unregister(); RatisDropwizardExports.clear(ratisMetricsMap, ratisReporterList); @@ -5626,6 +5633,10 @@ public ReconfigurationHandler getReconfigurationHandler() { return reconfigurationHandler; } + public OMServiceManager getOMServiceManager() { + return serviceManager; + } + /** * Wait until both buffers are flushed. This is used in cases like * "follower bootstrap tarball creation" where the rocksDb for the active diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMService.java new file mode 100644 index 000000000000..58449f889ad3 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMService.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +/** + * Interface for stateful background service in OM. + * + * Provide a fine-grained method to manipulate the status of these background + * services. + */ +public interface OMService { + /** + * Notify raft or safe mode related status changed. + */ + void notifyStatusChanged(); + + /** + * @return true, if next iteration of Service should take effect, + * false, if next iteration of Service should be skipped. + */ + boolean shouldRun(); + + /** + * @return name of the Service. + */ + String getServiceName(); + + /** + * Status of Service. + */ + enum ServiceStatus { + RUNNING, + PAUSING + } + + /** + * starts the OM service. + */ + void start() throws OMServiceException; + + /** + * stops the OM service. + */ + void stop(); + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceException.java new file mode 100644 index 000000000000..aeafaf839036 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +/** + * Checked exceptions thrown by an {@link OMService}. + */ +public class OMServiceException extends Exception { + + public OMServiceException() { + super(); + } + + public OMServiceException(String s) { + super(s); + } + + public OMServiceException(String message, Throwable cause) { + super(message, cause); + } + + public OMServiceException(Throwable cause) { + super(cause); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceManager.java new file mode 100644 index 000000000000..f11db96941ef --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMServiceManager.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manipulate background services in OM. + */ +public final class OMServiceManager { + private static final Logger LOG = + LoggerFactory.getLogger(OMServiceManager.class); + + private final List services = new ArrayList<>(); + + /** + * Register an OMService to OMServiceManager. + */ + public synchronized void register(OMService service) { + Objects.requireNonNull(service); + LOG.info("Registering service {}.", service.getServiceName()); + services.add(service); + } + + /** + * Notify raft related status changed. + */ + public synchronized void notifyStatusChanged() { + for (OMService service : services) { + LOG.debug("Notify service:{}.", service.getServiceName()); + service.notifyStatusChanged(); + } + } + + /** + * Start all running services. + */ + public synchronized void start() { + for (OMService service : services) { + LOG.debug("Stopping service:{}.", service.getServiceName()); + try { + service.start(); + } catch (OMServiceException e) { + LOG.warn("Could not start " + service.getServiceName(), e); + } + } + } + + /** + * Stops all running services. + */ + public synchronized void stop() { + for (OMService service : services) { + LOG.debug("Stopping service:{}.", service.getServiceName()); + service.stop(); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 2abaf9ae5719..feeda4ca72be 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -193,6 +193,7 @@ public void notifyLeaderReady() { if (metrics != null) { metrics.addRatisEvent("Ready to serve requests as the leader"); } + ozoneManager.getOMServiceManager().notifyStatusChanged(); } @Override @@ -202,6 +203,7 @@ public void notifyNotLeader(Collection pendingEntries) { if (metrics != null) { metrics.addRatisEvent("current leader OM steps down."); } + ozoneManager.getOMServiceManager().notifyStatusChanged(); } @Override @@ -219,6 +221,8 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, previousLeaderId = newLeaderId; // Initialize OMHAMetrics ozoneManager.omHAMetricsInit(newLeaderId.toString()); + // Notify OM service of leader change + ozoneManager.getOMServiceManager().notifyStatusChanged(); Map auditParams = new LinkedHashMap<>(); auditParams.put(AUDIT_PARAM_PREVIOUS_LEADER, diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMServiceManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMServiceManager.java new file mode 100644 index 000000000000..9ded837d28d1 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMServiceManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link OMServiceManager}. + */ +public class TestOMServiceManager { + + private static class OMContext { + private boolean isLeader; + + OMContext() { + isLeader = false; + } + + public boolean isLeader() { + return isLeader; + } + + public void setLeader(boolean leader) { + this.isLeader = leader; + } + } + + @Test + public void testServiceRunWhenLeader() { + + OMContext omContext = new OMContext(); + + // A service runs when it is a leader. + OMService serviceRunWhenLeader = new OMService() { + private ServiceStatus serviceStatus = ServiceStatus.PAUSING; + + @Override + public void notifyStatusChanged() { + if (omContext.isLeader()) { + serviceStatus = ServiceStatus.RUNNING; + } else { + serviceStatus = ServiceStatus.PAUSING; + } + } + + @Override + public boolean shouldRun() { + return serviceStatus == ServiceStatus.RUNNING; + } + + @Override + public String getServiceName() { + return "serviceRunWhenLeader"; + } + + @Override + public void start() throws OMServiceException { + + } + + @Override + public void stop() { + + } + }; + + OMServiceManager serviceManager = new OMServiceManager(); + serviceManager.register(serviceRunWhenLeader); + + // PAUSING at the beginning. + assertFalse(serviceRunWhenLeader.shouldRun()); + + // RUNNING when becoming leader. + omContext.setLeader(true); + serviceManager.notifyStatusChanged(); + assertTrue(serviceRunWhenLeader.shouldRun()); + + // PAUSING when stepping down. + omContext.setLeader(false); + serviceManager.notifyStatusChanged(); + assertFalse(serviceRunWhenLeader.shouldRun()); + + } + +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index 111779b95734..b9fb82786e13 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.ha.OMServiceManager; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.lock.OMLockDetails; import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider; @@ -111,6 +112,7 @@ public class TestOzoneManagerStateMachine { private RequestHandler handler; private ExecutorService executor; private OzoneManagerStateMachine sm; + private OMServiceManager serviceManager; @BeforeEach public void setup() { @@ -119,6 +121,8 @@ public void setup() { doubleBuffer = mock(OzoneManagerDoubleBuffer.class); handler = mock(RequestHandler.class); executor = Executors.newSingleThreadExecutor(); + serviceManager = mock(OMServiceManager.class); + when(om.getOMServiceManager()).thenReturn(serviceManager); sm = new OzoneManagerStateMachine(om, doubleBuffer, handler, executor, null); } @@ -878,6 +882,7 @@ public void testNotifyLeaderReady() { sm.notifyLeaderReady(); verify(snapshotManager).resetInFlightSnapshotCount(); + verify(serviceManager).notifyStatusChanged(); } // --- getLatestSnapshot tests ---