Skip to content

Commit

Permalink
Add listening address for embedded journal
Browse files Browse the repository at this point in the history
Fixes #10693

pr-link: #10706
change-id: cid-b47ba2b295b64f17b60dbf862974dba5f48ff274
  • Loading branch information
Göktürk Gezer authored and alluxio-bot committed Jan 14, 2020
1 parent 4bdc6f3 commit 0d5f9f6
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 8 deletions.
11 changes: 11 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -1289,6 +1289,15 @@ public String toString() {
.setScope(Scope.MASTER)
.setIsHidden(true)
.build();
public static final PropertyKey MASTER_EMBEDDED_JOURNAL_PROXY_HOST =
new Builder(Name.MASTER_EMBEDDED_JOURNAL_PROXY_HOST)
.setDescription(String.format(
"Used to bind embedded journal servers to a proxied host."
+ "Proxy hostname will still make use of %s for bind port.",
Name.MASTER_EMBEDDED_JOURNAL_PORT))
// No default value for proxy-host. Server will bind to "alluxio.master.hostname"
// as default.
.build();
public static final PropertyKey MASTER_EMBEDDED_JOURNAL_ADDRESSES =
new Builder(Name.MASTER_EMBEDDED_JOURNAL_ADDRESSES)
.setDescription(String.format("A comma-separated list of journal addresses for all "
Expand Down Expand Up @@ -4152,6 +4161,8 @@ public static final class Name {
public static final String MASTER_JOURNAL_TAILER_SLEEP_TIME_MS =
"alluxio.master.journal.tailer.sleep.time";
public static final String MASTER_RPC_ADDRESSES = "alluxio.master.rpc.addresses";
public static final String MASTER_EMBEDDED_JOURNAL_PROXY_HOST =
"alluxio.master.embedded.journal.bind.host";
public static final String MASTER_EMBEDDED_JOURNAL_ADDRESSES =
"alluxio.master.embedded.journal.addresses";
public static final String MASTER_EMBEDDED_JOURNAL_ELECTION_TIMEOUT =
Expand Down
Expand Up @@ -107,6 +107,18 @@ public InetSocketAddress getLocalAddress() {
return mLocalAddress;
}

/**
* @return proxy address of this Raft cluster node
*/
public InetSocketAddress getProxyAddress() {
if (ServerConfiguration.isSet(PropertyKey.MASTER_EMBEDDED_JOURNAL_PROXY_HOST)) {
return InetSocketAddress.createUnresolved(
ServerConfiguration.get(PropertyKey.MASTER_EMBEDDED_JOURNAL_PROXY_HOST),
getLocalAddress().getPort());
}
return null;
}

/**
* @return max log file size
*/
Expand Down
Expand Up @@ -26,6 +26,7 @@
import alluxio.master.journal.CatchupFuture;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.Journal;
import alluxio.master.transport.GrpcMessagingProxy;
import alluxio.master.transport.GrpcMessagingTransport;
import alluxio.proto.journal.Journal.JournalEntry;
import alluxio.security.user.ServerUserState;
Expand Down Expand Up @@ -238,14 +239,20 @@ private synchronized void initServer() {
mStateMachine.close();
}
mStateMachine = new JournalStateMachine(mJournals, () -> this.getJournalSinks(null));
// Read external proxy configuration.
GrpcMessagingProxy serverProxy = new GrpcMessagingProxy();
if (mConf.getProxyAddress() != null) {
serverProxy.addProxy(getLocalAddress(mConf), new Address(mConf.getProxyAddress()));
}
mServer = CopycatServer.builder(getLocalAddress(mConf))
.withStorage(storage)
.withElectionTimeout(Duration.ofMillis(mConf.getElectionTimeoutMs()))
.withHeartbeatInterval(Duration.ofMillis(mConf.getHeartbeatIntervalMs()))
.withSnapshotAllowed(mSnapshotAllowed)
.withSerializer(createSerializer())
.withTransport(new GrpcMessagingTransport(
ServerConfiguration.global(), ServerUserState.global(), RAFTSERVER_CLIENT_TYPE))
ServerConfiguration.global(), ServerUserState.global(), RAFTSERVER_CLIENT_TYPE)
.withServerProxy(serverProxy))
// Copycat wants a supplier that will generate *new* state machines. We can't handle
// generating a new state machine here, so we will throw an exception if copycat tries to
// call the supplier more than once.
Expand Down
@@ -0,0 +1,59 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.transport;

import io.atomix.catalyst.transport.Address;

import java.util.HashMap;
import java.util.Map;

/**
* Used to provide external proxy configuration to messaging servers/clients.
*/
public class GrpcMessagingProxy {
private Map<Address, Address> mProxyConf;

/**
* Creates new proxy configuration.
*/
public GrpcMessagingProxy() {
mProxyConf = new HashMap<>();
}

/**
* Adds a new proxy mapping.
*
* @param address source address that has been proxied
* @param proxy proxy address for source address
* @return the updated proxy configuration
*/
public GrpcMessagingProxy addProxy(Address address, Address proxy) {
mProxyConf.put(address, proxy);
return this;
}

/**
* @param address address to check for proxy configuration
* @return {@code true} if given address has proxy configuration
*/
public boolean hasProxyFor(Address address) {
return mProxyConf.containsKey(address);
}

/**
* @param address source address
* @return proxy address for given address
*/
public Address getProxyFor(Address address) {
return mProxyConf.get(address);
}
}
Expand Up @@ -62,18 +62,23 @@ public class GrpcMessagingServer implements Server {
/** Executor for building server listener. */
private final ExecutorService mExecutor;

/** Proxy configuration for server connections. */
private final GrpcMessagingProxy mProxy;

/**
* Creates a transport server that can be used to accept connections from remote clients.
*
* @param conf Alluxio configuration
* @param userState authentication user
* @param executor transport executor
* @param proxy external proxy configuration
*/
public GrpcMessagingServer(AlluxioConfiguration conf, UserState userState,
ExecutorService executor) {
ExecutorService executor, GrpcMessagingProxy proxy) {
mConf = conf;
mUserState = userState;
mExecutor = executor;
mProxy = proxy;
mConnections = Collections.synchronizedList(new LinkedList<>());
}

Expand All @@ -85,7 +90,8 @@ public synchronized CompletableFuture<Void> listen(Address address,
return mListenFuture;
}

LOG.debug("Messaging server binding to: {}", address);
LOG.debug("Opening messaging server for: {}", address);

final ThreadContext threadContext = ThreadContext.currentContextOrThrow();
mListenFuture = CompletableFuture.runAsync(() -> {
// Listener that notifies both this server instance and given listener.
Expand All @@ -94,10 +100,17 @@ public synchronized CompletableFuture<Void> listen(Address address,
listener.accept(connection);
};

Address bindAddress = address;
if (mProxy.hasProxyFor(address)) {
bindAddress = mProxy.getProxyFor(address);
LOG.debug("Found proxy: {} for address: {}", bindAddress, address);
}
LOG.debug("Binding messaging server to: {}", bindAddress);

// Create gRPC server.
mGrpcServer = GrpcServerBuilder
.forAddress(GrpcServerAddress.create(address.host(),
new InetSocketAddress(address.host(), address.port())), mConf, mUserState)
.forAddress(GrpcServerAddress.create(bindAddress.host(),
new InetSocketAddress(bindAddress.host(), bindAddress.port())), mConf, mUserState)
.maxInboundMessageSize((int) mConf
.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_MAX_INBOUND_MESSAGE_SIZE))
.addService(new GrpcService(ServerInterceptors.intercept(
Expand All @@ -109,10 +122,10 @@ public synchronized CompletableFuture<Void> listen(Address address,
try {
mGrpcServer.start();

LOG.info("Successfully started messaging server at: {}", address);
LOG.info("Successfully started messaging server at: {}", bindAddress);
} catch (IOException e) {
mGrpcServer = null;
LOG.debug("Failed to create messaging server for at: {}.", address, e);
LOG.debug("Failed to create messaging server for: {}.", address, e);
throw new RuntimeException(e);
}
}, mExecutor);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -50,6 +51,9 @@ public class GrpcMessagingTransport implements Transport {
/** List of created servers. */
private final List<GrpcMessagingServer> mServers;

/** External proxy configuration for servers. */
private GrpcMessagingProxy mServerProxy = new GrpcMessagingProxy();

/** Executor that is used by clients/servers for building connections. */
private final ExecutorService mExecutor;

Expand Down Expand Up @@ -90,6 +94,18 @@ public GrpcMessagingTransport(AlluxioConfiguration clientConf, AlluxioConfigurat
.newCachedThreadPool(ThreadFactoryUtils.build("grpc-messaging-transport-worker-%d", true));
}

/**
* Sets external proxy configuration for servers.
*
* @param proxy external proxy configuration
* @return the updated transport instance
*/
public synchronized GrpcMessagingTransport withServerProxy(GrpcMessagingProxy proxy) {
Assert.notNull(proxy, "Server proxy reference cannot be null.");
mServerProxy = proxy;
return this;
}

@Override
public synchronized Client client() {
if (mClosed) {
Expand All @@ -106,7 +122,8 @@ public synchronized Server server() {
if (mClosed) {
throw new RuntimeException("Messaging transport closed");
}
GrpcMessagingServer server = new GrpcMessagingServer(mServerConf, mServerUser, mExecutor);
GrpcMessagingServer server =
new GrpcMessagingServer(mServerConf, mServerUser, mExecutor, mServerProxy);
mServers.add(server);
return server;
}
Expand Down

0 comments on commit 0d5f9f6

Please sign in to comment.