Skip to content

Commit

Permalink
Remove the catalyst dependencies in backup transport
Browse files Browse the repository at this point in the history
pr-link: #12056
change-id: cid-45b0ca0655735d011ca3babafc83fc817b67860f
  • Loading branch information
LuQQiu committed Sep 2, 2020
1 parent 2c7bfbe commit 1468422
Show file tree
Hide file tree
Showing 19 changed files with 739 additions and 202 deletions.
4 changes: 0 additions & 4 deletions conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ log4j.appender.Console.Target=System.out
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n

# The netty transport has info-level logging on every connection, even successful
# connections. This can result in hundreds of log messages per second.
log4j.category.io.atomix.catalyst.transport.netty=WARN

# The ParquetWriter logs for every row group which is not noisy for large row group size,
# but very noisy for small row group size.
log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,20 @@
import alluxio.grpc.MessagingServiceGrpc;
import alluxio.security.user.UserState;

import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

/**
* {@link Client} implementation based on Alluxio gRPC messaging.
* Client implementation based on Alluxio gRPC messaging.
*
* Listen should be called once for each distinct address.
* Pending futures should all be closed prior to calling {@link #close()}.
*/
public class GrpcMessagingClient implements Client {
public class GrpcMessagingClient {
private static final Logger LOG = LoggerFactory.getLogger(GrpcMessagingClient.class);

/** Alluxio configuration. */
Expand Down Expand Up @@ -65,37 +62,43 @@ public GrpcMessagingClient(AlluxioConfiguration conf, UserState userState,
mClientType = clientType;
}

@Override
public CompletableFuture<Connection> connect(Address address) {
/**
* Creates a client and connects to the given address.
*
* @param address the server address
* @return future of connection result
*/
public CompletableFuture<GrpcMessagingConnection> connect(InetSocketAddress address) {
LOG.debug("Creating a messaging client connection to: {}", address);
final ThreadContext threadContext = ThreadContext.currentContextOrThrow();
final GrpcMessagingContext threadContext = GrpcMessagingContext.currentContextOrThrow();
// Future for this connection.
final CompletableFuture<Connection> connectionFuture = new CompletableFuture<>();
final CompletableFuture<GrpcMessagingConnection> connectionFuture = new CompletableFuture<>();
// Spawn gRPC connection building on a common pool.
final CompletableFuture<Connection> buildFuture = CompletableFuture.supplyAsync(() -> {
try {
// Create a new gRPC channel for requested connection.
GrpcChannel channel = GrpcChannelBuilder
.newBuilder(GrpcServerAddress.create(address.host(), address.socketAddress()), mConf)
.setClientType(mClientType).setSubject(mUserState.getSubject())
.build();
final CompletableFuture<GrpcMessagingConnection> buildFuture = CompletableFuture
.supplyAsync(() -> {
try {
// Create a new gRPC channel for requested connection.
GrpcChannel channel = GrpcChannelBuilder
.newBuilder(GrpcServerAddress.create(address.getHostString(), address), mConf)
.setClientType(mClientType).setSubject(mUserState.getSubject())
.build();

// Create stub for receiving stream from server.
MessagingServiceGrpc.MessagingServiceStub messageClientStub =
MessagingServiceGrpc.newStub(channel);
// Create stub for receiving stream from server.
MessagingServiceGrpc.MessagingServiceStub messageClientStub =
MessagingServiceGrpc.newStub(channel);

// Create client connection that is bound to remote server stream.
GrpcMessagingConnection clientConnection =
new GrpcMessagingClientConnection(threadContext, mExecutor, channel,
mConf.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS));
clientConnection.setTargetObserver(messageClientStub.connect(clientConnection));
// Create client connection that is bound to remote server stream.
GrpcMessagingConnection clientConnection =
new GrpcMessagingClientConnection(threadContext, mExecutor, channel,
mConf.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS));
clientConnection.setTargetObserver(messageClientStub.connect(clientConnection));

LOG.debug("Created a messaging client connection: {}", clientConnection);
return clientConnection;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}, mExecutor);
LOG.debug("Created a messaging client connection: {}", clientConnection);
return clientConnection;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}, mExecutor);
// When connection is build, complete the connection future with it on a catalyst thread context
// for setting up the connection.
buildFuture.whenComplete((result, error) -> {
Expand All @@ -110,7 +113,11 @@ public CompletableFuture<Connection> connect(Address address) {
return connectionFuture;
}

@Override
/**
* Closes the client.
*
* @return future of result
*/
public CompletableFuture<Void> close() {
LOG.debug("Closing messaging client; {}", this);
// Nothing to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import alluxio.grpc.GrpcChannel;

import io.atomix.catalyst.concurrent.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,7 +38,7 @@ public class GrpcMessagingClientConnection extends GrpcMessagingConnection {
* @param channel underlying gRPC channel
* @param requestTimeoutMs timeout in milliseconds for requests
*/
public GrpcMessagingClientConnection(ThreadContext context, ExecutorService executor,
public GrpcMessagingClientConnection(GrpcMessagingContext context, ExecutorService executor,
GrpcChannel channel, long requestTimeoutMs) {
super(ConnectionOwner.CLIENT, channel.toStringShort(), context, executor, requestTimeoutMs);
mChannel = channel;
Expand Down
Loading

0 comments on commit 1468422

Please sign in to comment.