Skip to content

Commit

Permalink
Add listening address for embedded journal
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #10706
orig-commit: 0d5f9f6
orig-commit-author: Göktürk Gezer <gokturk@alluxio.com>

pr-link: #10773
change-id: cid-b47ba2b295b64f17b60dbf862974dba5f48ff274
  • Loading branch information
Göktürk Gezer authored and alluxio-bot committed Jan 21, 2020
1 parent 95a3414 commit 5952a18
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 9 deletions.
11 changes: 11 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -1246,6 +1246,15 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_EMBEDDED_JOURNAL_PROXY_HOST =
new Builder(Name.MASTER_EMBEDDED_JOURNAL_PROXY_HOST)
.setDescription(String.format(
"Used to bind embedded journal servers to a proxied host."
+ "Proxy hostname will still make use of %s for bind port.",
Name.MASTER_EMBEDDED_JOURNAL_PORT))
// No default value for proxy-host. Server will bind to "alluxio.master.hostname"
// as default.
.build();
public static final PropertyKey MASTER_EMBEDDED_JOURNAL_ADDRESSES =
new Builder(Name.MASTER_EMBEDDED_JOURNAL_ADDRESSES)
.setDescription(String.format("A comma-separated list of journal addresses for all "
Expand Down Expand Up @@ -4122,6 +4131,8 @@ public static final class Name {
public static final String MASTER_JOURNAL_TAILER_SLEEP_TIME_MS =
"alluxio.master.journal.tailer.sleep.time";
public static final String MASTER_RPC_ADDRESSES = "alluxio.master.rpc.addresses";
public static final String MASTER_EMBEDDED_JOURNAL_PROXY_HOST =
"alluxio.master.embedded.journal.bind.host";
public static final String MASTER_EMBEDDED_JOURNAL_ADDRESSES =
"alluxio.master.embedded.journal.addresses";
public static final String MASTER_EMBEDDED_JOURNAL_ELECTION_TIMEOUT =
Expand Down
Expand Up @@ -107,6 +107,18 @@ public InetSocketAddress getLocalAddress() {
return mLocalAddress;
}

/**
* @return proxy address of this Raft cluster node
*/
public InetSocketAddress getProxyAddress() {
if (ServerConfiguration.isSet(PropertyKey.MASTER_EMBEDDED_JOURNAL_PROXY_HOST)) {
return InetSocketAddress.createUnresolved(
ServerConfiguration.get(PropertyKey.MASTER_EMBEDDED_JOURNAL_PROXY_HOST),
getLocalAddress().getPort());
}
return null;
}

/**
* @return max log file size
*/
Expand Down
Expand Up @@ -25,6 +25,7 @@
import alluxio.master.journal.AbstractJournalSystem;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.Journal;
import alluxio.master.journal.raft.transport.CopycatGrpcProxy;
import alluxio.master.journal.raft.transport.CopycatGrpcTransport;
import alluxio.proto.journal.Journal.JournalEntry;
import alluxio.security.user.ServerUserState;
Expand Down Expand Up @@ -231,14 +232,20 @@ private synchronized void initServer() {
mStateMachine.close();
}
mStateMachine = new JournalStateMachine(mJournals, () -> this.getJournalSinks(null));
// Read external proxy configuration.
CopycatGrpcProxy serverProxy = new CopycatGrpcProxy();
if (mConf.getProxyAddress() != null) {
serverProxy.addProxy(getLocalAddress(mConf), new Address(mConf.getProxyAddress()));
}
mServer = CopycatServer.builder(getLocalAddress(mConf))
.withStorage(storage)
.withElectionTimeout(Duration.ofMillis(mConf.getElectionTimeoutMs()))
.withHeartbeatInterval(Duration.ofMillis(mConf.getHeartbeatIntervalMs()))
.withSnapshotAllowed(mSnapshotAllowed)
.withSerializer(createSerializer())
.withTransport(
new CopycatGrpcTransport(ServerConfiguration.global(), ServerUserState.global()))
.withTransport(new CopycatGrpcTransport(
ServerConfiguration.global(), ServerUserState.global())
.withServerProxy(serverProxy))
// Copycat wants a supplier that will generate *new* state machines. We can't handle
// generating a new state machine here, so we will throw an exception if copycat tries to
// call the supplier more than once.
Expand Down
@@ -0,0 +1,59 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal.raft.transport;

import io.atomix.catalyst.transport.Address;

import java.util.HashMap;
import java.util.Map;

/**
* Used to provide external proxy configuration to messaging servers/clients.
*/
public class CopycatGrpcProxy {
private Map<Address, Address> mProxyConf;

/**
* Creates new proxy configuration.
*/
public CopycatGrpcProxy() {
mProxyConf = new HashMap<>();
}

/**
* Adds a new proxy mapping.
*
* @param address source address that has been proxied
* @param proxy proxy address for source address
* @return the updated proxy configuration
*/
public CopycatGrpcProxy addProxy(Address address, Address proxy) {
mProxyConf.put(address, proxy);
return this;
}

/**
* @param address address to check for proxy configuration
* @return {@code true} if given address has proxy configuration
*/
public boolean hasProxyFor(Address address) {
return mProxyConf.containsKey(address);
}

/**
* @param address source address
* @return proxy address for given address
*/
public Address getProxyFor(Address address) {
return mProxyConf.get(address);
}
}
Expand Up @@ -63,19 +63,24 @@ public class CopycatGrpcServer implements Server {
/** Executor for building server listener. */
private final ExecutorService mExecutor;

/** Proxy configuration for server connections. */
private final CopycatGrpcProxy mProxy;

/**
* Creates copycat transport server that can be used to accept connections from remote copycat
* clients.
*
* @param conf Alluxio configuration
* @param userState authentication user
* @param executor transport executor
* @param proxy external proxy configuration
*/
public CopycatGrpcServer(AlluxioConfiguration conf, UserState userState,
ExecutorService executor) {
ExecutorService executor, CopycatGrpcProxy proxy) {
mConf = conf;
mUserState = userState;
mExecutor = executor;
mProxy = proxy;
mConnections = Collections.synchronizedList(new LinkedList<>());
}

Expand All @@ -87,7 +92,8 @@ public synchronized CompletableFuture<Void> listen(Address address,
return mListenFuture;
}

LOG.debug("Copycat transport server binding to: {}", address);
LOG.debug("Opening copycat transport server for: {}", address);

final ThreadContext threadContext = ThreadContext.currentContextOrThrow();
mListenFuture = CompletableFuture.runAsync(() -> {
// Listener that notifies both this server instance and given listener.
Expand All @@ -96,10 +102,17 @@ public synchronized CompletableFuture<Void> listen(Address address,
listener.accept(connection);
};

Address bindAddress = address;
if (mProxy.hasProxyFor(address)) {
bindAddress = mProxy.getProxyFor(address);
LOG.debug("Found proxy: {} for address: {}", bindAddress, address);
}
LOG.debug("Binding copycat transport server to: {}", bindAddress);

// Create gRPC server.
mGrpcServer = GrpcServerBuilder
.forAddress(GrpcServerAddress.create(address.host(),
new InetSocketAddress(address.host(), address.port())), mConf, mUserState)
.forAddress(GrpcServerAddress.create(bindAddress.host(),
new InetSocketAddress(bindAddress.host(), bindAddress.port())), mConf, mUserState)
.maxInboundMessageSize((int) mConf
.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_MAX_INBOUND_MESSAGE_SIZE))
.addService(new GrpcService(ServerInterceptors.intercept(
Expand All @@ -111,10 +124,10 @@ public synchronized CompletableFuture<Void> listen(Address address,
try {
mGrpcServer.start();

LOG.info("Successfully started gRPC server for copycat transport at: {}", address);
LOG.info("Successfully started copycat transport server at: {}", bindAddress);
} catch (IOException e) {
mGrpcServer = null;
LOG.debug("Failed to create gRPC server for copycat transport at: {}.", address, e);
LOG.debug("Failed to create copycat transport server for: {}.", address, e);
throw new RuntimeException(e);
}
}, mExecutor);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +49,9 @@ public class CopycatGrpcTransport implements Transport {
/** List of created servers. */
private final List<CopycatGrpcServer> mServers;

/** External proxy configuration for servers. */
private CopycatGrpcProxy mServerProxy = new CopycatGrpcProxy();

/** Executor that is used by clients/servers for building connections. */
private final ExecutorService mExecutor;

Expand Down Expand Up @@ -85,6 +89,18 @@ public CopycatGrpcTransport(AlluxioConfiguration clientConf, AlluxioConfiguratio
.newCachedThreadPool(ThreadFactoryUtils.build("copycat-transport-worker-%d", true));
}

/**
* Sets external proxy configuration for servers.
*
* @param proxy external proxy configuration
* @return the updated transport instance
*/
public synchronized CopycatGrpcTransport withServerProxy(CopycatGrpcProxy proxy) {
Assert.notNull(proxy, "Server proxy reference cannot be null.");
mServerProxy = proxy;
return this;
}

@Override
public synchronized Client client() {
if (mClosed) {
Expand All @@ -100,7 +116,8 @@ public synchronized Server server() {
if (mClosed) {
throw new RuntimeException("Transport closed");
}
CopycatGrpcServer server = new CopycatGrpcServer(mServerConf, mServerUser, mExecutor);
CopycatGrpcServer server =
new CopycatGrpcServer(mServerConf, mServerUser, mExecutor, mServerProxy);
mServers.add(server);
return server;
}
Expand Down

0 comments on commit 5952a18

Please sign in to comment.