Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the catalyst dependencies in backup transport #12056

Merged
merged 7 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ log4j.appender.Console.Target=System.out
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n

# The netty transport has info-level logging on every connection, even successful
# connections. This can result in hundreds of log messages per second.
log4j.category.io.atomix.catalyst.transport.netty=WARN

# The ParquetWriter logs for every row group which is not noisy for large row group size,
# but very noisy for small row group size.
log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,20 @@
import alluxio.grpc.MessagingServiceGrpc;
import alluxio.security.user.UserState;

import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

/**
* {@link Client} implementation based on Alluxio gRPC messaging.
* Client implementation based on Alluxio gRPC messaging.
*
* Listen should be called once for each distinct address.
* Pending futures should all be closed prior to calling {@link #close()}.
*/
public class GrpcMessagingClient implements Client {
public class GrpcMessagingClient {
private static final Logger LOG = LoggerFactory.getLogger(GrpcMessagingClient.class);

/** Alluxio configuration. */
Expand Down Expand Up @@ -65,37 +62,43 @@ public GrpcMessagingClient(AlluxioConfiguration conf, UserState userState,
mClientType = clientType;
}

@Override
public CompletableFuture<Connection> connect(Address address) {
/**
* Creates a client and connects to the given address.
*
* @param address the server address
* @return future of connection result
*/
public CompletableFuture<GrpcMessagingConnection> connect(InetSocketAddress address) {
LOG.debug("Creating a messaging client connection to: {}", address);
final ThreadContext threadContext = ThreadContext.currentContextOrThrow();
final GrpcMessagingContext threadContext = GrpcMessagingContext.currentContextOrThrow();
// Future for this connection.
final CompletableFuture<Connection> connectionFuture = new CompletableFuture<>();
final CompletableFuture<GrpcMessagingConnection> connectionFuture = new CompletableFuture<>();
// Spawn gRPC connection building on a common pool.
final CompletableFuture<Connection> buildFuture = CompletableFuture.supplyAsync(() -> {
try {
// Create a new gRPC channel for requested connection.
GrpcChannel channel = GrpcChannelBuilder
.newBuilder(GrpcServerAddress.create(address.host(), address.socketAddress()), mConf)
.setClientType(mClientType).setSubject(mUserState.getSubject())
.build();
final CompletableFuture<GrpcMessagingConnection> buildFuture = CompletableFuture
.supplyAsync(() -> {
try {
// Create a new gRPC channel for requested connection.
GrpcChannel channel = GrpcChannelBuilder
.newBuilder(GrpcServerAddress.create(address.getHostString(), address), mConf)
.setClientType(mClientType).setSubject(mUserState.getSubject())
.build();

// Create stub for receiving stream from server.
MessagingServiceGrpc.MessagingServiceStub messageClientStub =
MessagingServiceGrpc.newStub(channel);
// Create stub for receiving stream from server.
MessagingServiceGrpc.MessagingServiceStub messageClientStub =
MessagingServiceGrpc.newStub(channel);

// Create client connection that is bound to remote server stream.
GrpcMessagingConnection clientConnection =
new GrpcMessagingClientConnection(threadContext, mExecutor, channel,
mConf.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS));
clientConnection.setTargetObserver(messageClientStub.connect(clientConnection));
// Create client connection that is bound to remote server stream.
GrpcMessagingConnection clientConnection =
new GrpcMessagingClientConnection(threadContext, mExecutor, channel,
mConf.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS));
clientConnection.setTargetObserver(messageClientStub.connect(clientConnection));

LOG.debug("Created a messaging client connection: {}", clientConnection);
return clientConnection;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}, mExecutor);
LOG.debug("Created a messaging client connection: {}", clientConnection);
return clientConnection;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}, mExecutor);
// When connection is build, complete the connection future with it on a catalyst thread context
// for setting up the connection.
buildFuture.whenComplete((result, error) -> {
Expand All @@ -110,7 +113,11 @@ public CompletableFuture<Connection> connect(Address address) {
return connectionFuture;
}

@Override
/**
* Closes the client.
*
* @return future of result
*/
public CompletableFuture<Void> close() {
LOG.debug("Closing messaging client; {}", this);
// Nothing to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import alluxio.grpc.GrpcChannel;

import io.atomix.catalyst.concurrent.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,7 +38,7 @@ public class GrpcMessagingClientConnection extends GrpcMessagingConnection {
* @param channel underlying gRPC channel
* @param requestTimeoutMs timeout in milliseconds for requests
*/
public GrpcMessagingClientConnection(ThreadContext context, ExecutorService executor,
public GrpcMessagingClientConnection(GrpcMessagingContext context, ExecutorService executor,
GrpcChannel channel, long requestTimeoutMs) {
super(ConnectionOwner.CLIENT, channel.toStringShort(), context, executor, requestTimeoutMs);
mChannel = channel;
Expand Down
Loading