Skip to content

Commit

Permalink
Support gRPC on standby masters
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Implemented a RpcServerStandbyGrpcService which runs gRPC server on both
standby and primary masters.

### Why are the changes needed?

We are going to implement a feature which allows workers registering to
all masters to speed up the master failover process. This requires
standby masters to enable gRPC servers so that the works can rpc. This
PR made these changes.

### Does this PR introduce any user facing changes?

N/A

pr-link: #16839
change-id: cid-743160dea3f42872555b48d9a62125b2740962a3
  • Loading branch information
elega committed Feb 2, 2023
1 parent 530e720 commit 325f36a
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 1 deletion.
9 changes: 9 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -3833,6 +3833,13 @@ public String toString() {
.setDescription("Whether a standby master runs a web server")
.setScope(Scope.SERVER)
.build();
public static final PropertyKey STANDBY_MASTER_GRPC_ENABLED =
booleanBuilder(Name.STANDBY_MASTER_GRPC_ENABLED)
.setDefaultValue(false)
.setDescription("Whether a standby master runs a grpc server")
.setScope(Scope.ALL)
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.build();

//
// Throttle
Expand Down Expand Up @@ -8119,6 +8126,8 @@ public static final class Name {
"alluxio.standby.master.metrics.sink.enabled";
public static final String STANDBY_MASTER_WEB_ENABLED =
"alluxio.standby.master.web.enabled";
public static final String STANDBY_MASTER_GRPC_ENABLED =
"alluxio.standby.master.grpc.enabled";

//
// Worker related properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.GetServiceVersionPRequest;
import alluxio.grpc.GrpcChannel;
Expand Down Expand Up @@ -140,7 +141,11 @@ private InetSocketAddress getAddress() {
LOG.debug("Timeout while connecting to {}", address);
} catch (CancelledException e) {
LOG.debug("Cancelled while connecting to {}", address);
} catch (AlluxioStatusException e) {
} catch (NotFoundException e) {
// If the gRPC server is enabled but the metadata service isn't enabled,
// try the next master address.
LOG.debug("Meta service rpc endpoint not found on {}. {}", address, e);
} catch (AlluxioStatusException e) {
LOG.error("Error while connecting to {}. {}", address, e);
// Breaking the loop on non filtered error.
break;
Expand Down
13 changes: 13 additions & 0 deletions core/server/common/src/main/java/alluxio/master/Master.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@

import alluxio.Server;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.GrpcService;
import alluxio.grpc.ServiceType;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.Journaled;

import java.util.Collections;
import java.util.Map;

/**
* This interface contains common operations for all masters.
*/
Expand All @@ -29,4 +34,12 @@ public interface Master extends Journaled, Server<Boolean> {
* @return a master context
*/
MasterContext getMasterContext();

/**
* @return a map from service names to gRPC services that serve RPCs for this master,
* if the master is a standby master.
*/
default Map<ServiceType, GrpcService> getStandbyServices() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ public static RpcServerService create(
InetSocketAddress bindAddress,
MasterProcess masterProcess,
MasterRegistry masterRegistry) {
if (Configuration.getBoolean(PropertyKey.STANDBY_MASTER_GRPC_ENABLED)) {
return new RpcServerStandbyGrpcService(bindAddress, masterProcess, masterRegistry);
}
return new RpcServerService(bindAddress, masterProcess, masterRegistry);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.service.rpc;

import alluxio.master.Master;
import alluxio.master.MasterProcess;
import alluxio.master.MasterRegistry;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

/**
* Created by {@link RpcServerService.Factory}.
* Manages the behavior of the master's rpc service. The grpc server is always on.
* When the promotion/demotion happens, the rpc service will be stopped and restarted.
* The new started grpc service will serve gRPC endpoints based on the node state (PRIMARY/STANDBY).
* No rejecting server is deployed.
*/
public class RpcServerStandbyGrpcService extends RpcServerService {
protected static final Logger LOG = LoggerFactory.getLogger(RpcServerStandbyGrpcService.class);

private boolean mIsPromoted = false;

protected RpcServerStandbyGrpcService(
InetSocketAddress bindAddress,
MasterProcess masterProcess,
MasterRegistry masterRegistry
) {
super(bindAddress, masterProcess, masterRegistry);
}

@Override
public synchronized void start() {
LOG.info("Starting {}", this.getClass().getSimpleName());
startGrpcServer(Master::getStandbyServices);
}

@Override
public synchronized void stop() {
stopGrpcServer();
stopRpcExecutor();
mIsPromoted = false;
}

@Override
public synchronized void promote() {
Preconditions.checkState(!mIsPromoted, "double promotion is not allowed");
LOG.info("Promoting {}", this.getClass().getSimpleName());
stopGrpcServer();
stopRpcExecutor();
waitForFree();
startGrpcServer(Master::getServices);
mIsPromoted = true;
}

@Override
public synchronized void demote() {
Preconditions.checkState(mIsPromoted, "double demotion is not allowed");
LOG.info("Demoting {}", this.getClass().getSimpleName());
stopGrpcServer();
stopRpcExecutor();
waitForFree();
startGrpcServer(Master::getStandbyServices);
mIsPromoted = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,33 @@ public void restoreFromBackupLocal() throws Exception {
startStopTest(master);
}

@Test
public void startStopStandbyStandbyServer() throws Exception {
Configuration.set(PropertyKey.STANDBY_MASTER_GRPC_ENABLED, true);
AlluxioMasterProcess master =
new AlluxioMasterProcess(new NoopJournalSystem(), new AlwaysStandbyPrimarySelector());
master.registerService(
RpcServerService.Factory.create(
master.getRpcBindAddress(), master, master.getRegistry()));
master.registerService(WebServerService.Factory.create(master.getWebBindAddress(), master));
master.registerService(MetricsService.Factory.create());

Thread t = new Thread(() -> {
try {
master.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
t.start();
final int TIMEOUT_MS = 10_000;
master.waitForGrpcServerReady(TIMEOUT_MS);
startStopTest(master,
true,
Configuration.getBoolean(PropertyKey.STANDBY_MASTER_WEB_ENABLED),
Configuration.getBoolean(PropertyKey.STANDBY_MASTER_METRICS_SINK_ENABLED));
}

private void startStopTest(AlluxioMasterProcess master) throws Exception {
startStopTest(master, true, true, true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.service.rpc;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.master.AlluxioMasterProcess;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

/**
* Test for RpcServerStandbyGrpcServiceTest.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(AlluxioMasterProcess.class)
public class RpcServerStandbyGrpcServiceTest extends RpcServerServiceTestBase {
@Before
public void setUp() {
Configuration.reloadProperties();
Configuration.set(PropertyKey.STANDBY_MASTER_GRPC_ENABLED, true);
super.setUp();
}

@Test
public void primaryOnlyTest() {
RpcServerService service =
RpcServerService.Factory.create(mRpcAddress, mMasterProcess, mRegistry);
Assert.assertTrue(waitForFree());

Assert.assertFalse(service.isServing());
service.start();
// when standby master is enabled, gRPC server is always on even if it's standby.
Assert.assertTrue(isGrpcBound());
Assert.assertTrue(service.isServing());
for (int i = 0; i < 5; i++) {
service.promote();
Assert.assertTrue(service.isServing());
Assert.assertTrue(isGrpcBound());
service.demote();
Assert.assertTrue(isGrpcBound());
Assert.assertTrue(service.isServing());
}
service.stop();
Assert.assertFalse(service.isServing());
Assert.assertFalse(isGrpcBound());
}

@Test
public void doubleStartRpcServer() {
RpcServerService service =
RpcServerService.Factory.create(mRpcAddress, mMasterProcess, mRegistry);

service.start();
service.promote();
Assert.assertThrows("double promotion is not allowed",
IllegalStateException.class, service::promote);

service.demote();
Assert.assertThrows("double demotion is not allowed",
IllegalStateException.class, service::demote);
}
}

0 comments on commit 325f36a

Please sign in to comment.