Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
import org.apache.hadoop.ozone.om.execution.OMExecutionFlow;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
import org.apache.hadoop.ozone.om.ha.OMServiceManager;
import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
Expand Down Expand Up @@ -515,6 +516,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OmSnapshotManager omSnapshotManager;
private volatile DirectoryDeletingService dirDeletingService;

private final OMServiceManager serviceManager;

@SuppressWarnings("methodlength")
private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
throws IOException, AuthenticationException {
Expand Down Expand Up @@ -712,6 +715,9 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
readBlacklist = OzoneBlacklist.getReadonlyBlacklist(conf);

s3OzoneAdmins = OzoneAdmins.getS3Admins(conf);

serviceManager = new OMServiceManager();

instantiateServices(false);

// Create special volume s3v which is required for S3G.
Expand Down Expand Up @@ -2459,6 +2465,7 @@ public boolean stop() {
if (omRatisSnapshotProvider != null) {
omRatisSnapshotProvider.close();
}
serviceManager.stop();
DeletingServiceMetrics.unregister();
OMPerformanceMetrics.unregister();
RatisDropwizardExports.clear(ratisMetricsMap, ratisReporterList);
Expand Down Expand Up @@ -5626,6 +5633,10 @@ public ReconfigurationHandler getReconfigurationHandler() {
return reconfigurationHandler;
}

public OMServiceManager getOMServiceManager() {
return serviceManager;
}

/**
* Wait until both buffers are flushed. This is used in cases like
* "follower bootstrap tarball creation" where the rocksDb for the active
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ha;

/**
* Interface for stateful background service in OM.
*
* Provide a fine-grained method to manipulate the status of these background
* services.
*/
public interface OMService {
/**
* Notify raft or safe mode related status changed.
*/
void notifyStatusChanged();

/**
* @return true, if next iteration of Service should take effect,
* false, if next iteration of Service should be skipped.
*/
boolean shouldRun();

/**
* @return name of the Service.
*/
String getServiceName();

/**
* Status of Service.
*/
enum ServiceStatus {
RUNNING,
PAUSING
}

/**
* starts the OM service.
*/
void start() throws OMServiceException;

/**
* stops the OM service.
*/
void stop();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ha;

/**
* Checked exceptions thrown by an {@link OMService}.
*/
public class OMServiceException extends Exception {

public OMServiceException() {
super();
}

public OMServiceException(String s) {
super(s);
}

public OMServiceException(String message, Throwable cause) {
super(message, cause);
}

public OMServiceException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ha;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Manipulate background services in OM.
*/
public final class OMServiceManager {
private static final Logger LOG =
LoggerFactory.getLogger(OMServiceManager.class);

private final List<OMService> services = new ArrayList<>();

/**
* Register an OMService to OMServiceManager.
*/
public synchronized void register(OMService service) {
Objects.requireNonNull(service);
LOG.info("Registering service {}.", service.getServiceName());
services.add(service);
}

/**
* Notify raft related status changed.
*/
public synchronized void notifyStatusChanged() {
for (OMService service : services) {
LOG.debug("Notify service:{}.", service.getServiceName());
service.notifyStatusChanged();
}
}

/**
* Start all running services.
*/
public synchronized void start() {
for (OMService service : services) {
LOG.debug("Stopping service:{}.", service.getServiceName());
try {
service.start();
} catch (OMServiceException e) {
LOG.warn("Could not start " + service.getServiceName(), e);
}
}
}

/**
* Stops all running services.
*/
public synchronized void stop() {
for (OMService service : services) {
LOG.debug("Stopping service:{}.", service.getServiceName());
service.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void notifyLeaderReady() {
if (metrics != null) {
metrics.addRatisEvent("Ready to serve requests as the leader");
}
ozoneManager.getOMServiceManager().notifyStatusChanged();
}

@Override
Expand All @@ -202,6 +203,7 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
if (metrics != null) {
metrics.addRatisEvent("current leader OM steps down.");
}
ozoneManager.getOMServiceManager().notifyStatusChanged();
}

@Override
Expand All @@ -219,6 +221,8 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
previousLeaderId = newLeaderId;
// Initialize OMHAMetrics
ozoneManager.omHAMetricsInit(newLeaderId.toString());
// Notify OM service of leader change
ozoneManager.getOMServiceManager().notifyStatusChanged();

Map<String, String> auditParams = new LinkedHashMap<>();
auditParams.put(AUDIT_PARAM_PREVIOUS_LEADER,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ha;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

/**
* Tests for {@link OMServiceManager}.
*/
public class TestOMServiceManager {

private static class OMContext {
private boolean isLeader;

OMContext() {
isLeader = false;
}

public boolean isLeader() {
return isLeader;
}

public void setLeader(boolean leader) {
this.isLeader = leader;
}
}

@Test
public void testServiceRunWhenLeader() {

OMContext omContext = new OMContext();

// A service runs when it is a leader.
OMService serviceRunWhenLeader = new OMService() {
private ServiceStatus serviceStatus = ServiceStatus.PAUSING;

@Override
public void notifyStatusChanged() {
if (omContext.isLeader()) {
serviceStatus = ServiceStatus.RUNNING;
} else {
serviceStatus = ServiceStatus.PAUSING;
}
}

@Override
public boolean shouldRun() {
return serviceStatus == ServiceStatus.RUNNING;
}

@Override
public String getServiceName() {
return "serviceRunWhenLeader";
}

@Override
public void start() throws OMServiceException {

}

@Override
public void stop() {

}
};

OMServiceManager serviceManager = new OMServiceManager();
serviceManager.register(serviceRunWhenLeader);

// PAUSING at the beginning.
assertFalse(serviceRunWhenLeader.shouldRun());

// RUNNING when becoming leader.
omContext.setLeader(true);
serviceManager.notifyStatusChanged();
assertTrue(serviceRunWhenLeader.shouldRun());

// PAUSING when stepping down.
omContext.setLeader(false);
serviceManager.notifyStatusChanged();
assertFalse(serviceRunWhenLeader.shouldRun());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMServiceManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
Expand Down Expand Up @@ -111,6 +112,7 @@ public class TestOzoneManagerStateMachine {
private RequestHandler handler;
private ExecutorService executor;
private OzoneManagerStateMachine sm;
private OMServiceManager serviceManager;

@BeforeEach
public void setup() {
Expand All @@ -119,6 +121,8 @@ public void setup() {
doubleBuffer = mock(OzoneManagerDoubleBuffer.class);
handler = mock(RequestHandler.class);
executor = Executors.newSingleThreadExecutor();
serviceManager = mock(OMServiceManager.class);
when(om.getOMServiceManager()).thenReturn(serviceManager);
sm = new OzoneManagerStateMachine(om, doubleBuffer, handler, executor, null);
}

Expand Down Expand Up @@ -878,6 +882,7 @@ public void testNotifyLeaderReady() {
sm.notifyLeaderReady();

verify(snapshotManager).resetInFlightSnapshotCount();
verify(serviceManager).notifyStatusChanged();
}

// --- getLatestSnapshot tests ---
Expand Down
Loading