Skip to content

Commit

Permalink
Fix gRPC exception handling and improve traces
Browse files Browse the repository at this point in the history
Client level retries were sometimes ineffective due to narrow handling
of exceptions.

pr-link: #10828
change-id: cid-e6e653da7c90f614a03285f8f71884ecfc11b3e3
  • Loading branch information
Göktürk Gezer committed Feb 4, 2020
1 parent 0d1813b commit 0077608
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 41 deletions.
6 changes: 5 additions & 1 deletion core/common/src/main/java/alluxio/grpc/GrpcChannel.java
Expand Up @@ -94,8 +94,12 @@ public void intercept(ClientInterceptor interceptor) {

/**
* Shuts down the channel.
*
* Shutdown should be synchronized as it could be called concurrently due to:
* - Authentication long polling
* - gRPC messaging stream.
*/
public void shutdown() {
public synchronized void shutdown() {
if (mAuthDriver != null) {
// Close authenticated session with server.
mAuthDriver.close();
Expand Down
13 changes: 1 addition & 12 deletions core/common/src/main/java/alluxio/grpc/GrpcChannelKey.java
Expand Up @@ -305,7 +305,7 @@ public String toString() {
* @return short representation of this channel key
*/
public String toStringShort() {
return MoreObjects.toStringHelper("Channel")
return MoreObjects.toStringHelper(this)
.add("ClientType", getStringFromOptional(mClientType))
.add("ClientHostname", mLocalHostName)
.add("ServerAddress", mServerAddress)
Expand All @@ -314,17 +314,6 @@ public String toStringShort() {
.toString();
}

/**
* @return server-side representation of this channel key
*/
public String toOwnerString() {
return MoreObjects.toStringHelper("")
.add("ClientType", getStringFromOptional(mClientType))
.add("ClientHostname", mLocalHostName)
.omitNullValues()
.toString();
}

/**
* Used to get underlying string representation from {@link Optional} fields.
*
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/main/java/alluxio/grpc/GrpcServer.java
Expand Up @@ -136,7 +136,7 @@ public boolean isServing() {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("Server", mServer)
.add("InternalServer", mServer)
.add("AuthServerType", mAuthServer.getClass().getSimpleName())
.toString();
}
Expand Down
Expand Up @@ -46,8 +46,14 @@ public AbstractSaslClientHandler(ChannelAuthenticationScheme authScheme) {
mAuthScheme = authScheme;
}

/**
* This is synchronized in order to protect {@link #mSaslClient}.
*/
@Override
public SaslMessage handleMessage(SaslMessage message) throws SaslException {
public synchronized SaslMessage handleMessage(SaslMessage message) throws SaslException {
if (mSaslClient == null) {
throw new SaslException("SaslClient handler is closed");
}
// Generate initial message.
if (message == null) {
SaslMessage.Builder initialResponse = SaslMessage.newBuilder()
Expand Down Expand Up @@ -84,13 +90,18 @@ public SaslMessage handleMessage(SaslMessage message) throws SaslException {
}
}

/**
* This is synchronized in order to protect {@link #mSaslClient}.
*/
@Override
public void close() {
public synchronized void close() {
if (mSaslClient != null) {
try {
mSaslClient.dispose();
} catch (SaslException exc) {
LOG.debug("Failed to close SaslClient.", exc);
} finally {
mSaslClient = null;
}
}
}
Expand Down
Expand Up @@ -20,10 +20,12 @@

import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.sasl.SaslException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -60,6 +62,8 @@ public class AuthenticatedChannelClientDriver implements StreamObserver<SaslMess
private volatile boolean mChannelAuthenticated;
/** Used to wait during authentication handshake. */
private SettableFuture<Void> mChannelAuthenticatedFuture;
/** Initiating message for authentication. */
private SaslMessage mInitiateMessage;

/**
* Creates client driver with given handshake handler.
Expand All @@ -68,11 +72,13 @@ public class AuthenticatedChannelClientDriver implements StreamObserver<SaslMess
* @param channelKey channel key
*/
public AuthenticatedChannelClientDriver(SaslClientHandler saslClientHandler,
GrpcChannelKey channelKey) {
GrpcChannelKey channelKey) throws SaslException {
mSaslClientHandler = saslClientHandler;
mChannelKey = channelKey;
mChannelAuthenticated = false;
mChannelAuthenticatedFuture = SettableFuture.create();
// Generate the initiating message while sasl handler is valid.
mInitiateMessage = generateInitialMessage();
}

/**
Expand All @@ -87,7 +93,8 @@ public void setServerObserver(StreamObserver<SaslMessage> requestObserver) {
@Override
public void onNext(SaslMessage saslMessage) {
try {
LOG.debug("Received message for {}. Message: {}", mChannelKey.toStringShort(), saslMessage);
LOG.debug("Received message for channel: {}. Message: {}",
mChannelKey.toStringShort(), saslMessage);
SaslMessage response = mSaslClientHandler.handleMessage(saslMessage);
if (response != null) {
mRequestObserver.onNext(response);
Expand All @@ -108,7 +115,7 @@ public void onNext(SaslMessage saslMessage) {

@Override
public void onError(Throwable throwable) {
LOG.debug("Received error for {}. Error: {}", mChannelKey.toStringShort(), throwable);
LOG.debug("Received error for channel: {}. Error: {}", mChannelKey.toStringShort(), throwable);
closeAuthenticatedChannel(false);

// Fail blocked waiters.
Expand All @@ -117,15 +124,16 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
LOG.debug("Authenticated channel revoked by server for {}", mChannelKey.toStringShort());
LOG.debug("Authenticated channel revoked by server for channel: {}",
mChannelKey.toStringShort());
closeAuthenticatedChannel(false);
}

/**
* Stops authenticated session with the server by releasing the long poll.
*/
public void close() {
LOG.debug("Closing authentication for {}", mChannelKey.toStringShort());
LOG.debug("Closing authentication for channel: {}", mChannelKey.toStringShort());
closeAuthenticatedChannel(true);
}

Expand All @@ -144,12 +152,13 @@ public boolean isAuthenticated() {
*/
public void startAuthenticatedChannel(long timeoutMs) throws AlluxioStatusException {
try {
LOG.debug("Initiating authentication for {}", mChannelKey.toStringShort());
LOG.debug("Initiating authentication for channel: {}", mChannelKey.toStringShort());
// Send the server initial message.
SaslMessage.Builder initialMsg = mSaslClientHandler.handleMessage(null).toBuilder();
initialMsg.setClientId(mChannelKey.getChannelId().toString());
initialMsg.setChannelRef(mChannelKey.toStringShort());
mRequestObserver.onNext(initialMsg.build());
try {
mRequestObserver.onNext(mInitiateMessage);
} catch (StatusRuntimeException e) {
// Ignore. waitUntilChannelAuthenticated() will throw stored cause.
}

// Utility to return from start when channel is secured.
waitUntilChannelAuthenticated(timeoutMs);
Expand All @@ -159,6 +168,13 @@ public void startAuthenticatedChannel(long timeoutMs) throws AlluxioStatusExcept
}
}

private SaslMessage generateInitialMessage() throws SaslException {
SaslMessage.Builder initialMsg = mSaslClientHandler.handleMessage(null).toBuilder();
initialMsg.setClientId(mChannelKey.getChannelId().toString());
initialMsg.setChannelRef(mChannelKey.toStringShort());
return initialMsg.build();
}

private void waitUntilChannelAuthenticated(long timeoutMs) throws AlluxioStatusException {
try {
// Wait until authentication status changes.
Expand Down
Expand Up @@ -67,7 +67,7 @@ public void setClientObserver(StreamObserver<SaslMessage> requestObserver) {

private void initAuthenticatedChannel(ChannelAuthenticationScheme authScheme, UUID channelId,
String channelRef) throws SaslException {
LOG.debug("Initializing authentication for {}. AuthType: {}", mChannelRef, authScheme);
LOG.debug("Initializing authentication for channel: {}. AuthType: {}", mChannelRef, authScheme);
// Create sasl handler for the requested scheme.
mSaslServerHandler = mAuthenticationServer.createSaslHandler(authScheme);
// Unregister from registry if in case it was authenticated before.
Expand Down Expand Up @@ -99,7 +99,7 @@ private void closeAuthenticatedChannel(boolean signalOwner) {
// Complete stream.
mRequestObserver.onCompleted();
} catch (Exception exc) {
LOG.debug("Failed to close gRPC stream of {}. Error: {}", mChannelRef, exc);
LOG.debug("Failed to close gRPC stream of channel: {}. Error: {}", mChannelRef, exc);
}
}
}
Expand All @@ -114,7 +114,7 @@ public void onNext(SaslMessage saslMessage) {
saslMessage.getChannelRef());
}

LOG.debug("Responding to a message of {}. Message: {}", mChannelRef, saslMessage);
LOG.debug("Responding to a message of channel: {}. Message: {}", mChannelRef, saslMessage);
// Consult sasl server for handling the message.
SaslMessage response = mSaslServerHandler.handleMessage(saslMessage);

Expand All @@ -125,7 +125,7 @@ public void onNext(SaslMessage saslMessage) {
// Push response to stream.
mRequestObserver.onNext(response);
} catch (Throwable t) {
LOG.debug("Exception while handling message of {}. Message: {}. Error: {}",
LOG.debug("Exception while handling message of channel: {}. Message: {}. Error: {}",
mChannelRef, saslMessage, t);
// Invalidate stream.
mRequestObserver.onError(AlluxioStatusException.fromThrowable(t).toGrpcStatusException());
Expand All @@ -147,7 +147,7 @@ public void onCompleted() {
* Completes authenticated channel.
*/
public void close() {
LOG.debug("Closing authentication for {}", mChannelRef);
LOG.debug("Closing authentication for channel: {}", mChannelRef);
closeAuthenticatedChannel(true);
}
}
Expand Up @@ -82,17 +82,16 @@ public ChannelAuthenticator(GrpcChannelKey channelKey, ManagedChannel managedCha
* @throws AlluxioStatusException
*/
public void authenticate() throws AlluxioStatusException {
LOG.debug("Authenticating channel: {}. AuthType: {}", mChannelKey, mAuthType);
LOG.debug("Authenticating channel: {}. AuthType: {}", mChannelKey.toStringShort(), mAuthType);

ChannelAuthenticationScheme authScheme = getChannelAuthScheme(mAuthType, mParentSubject,
mChannelKey.getServerAddress().getSocketAddress());

try {
// Create SaslHandler for handling sasl handshake.
SaslClientHandler saslClientHandler =
createSaslClientHandler(mChannelKey.getServerAddress(), authScheme, mParentSubject);
// Create client-side driver for establishing authenticated channel with the target.
mAuthDriver = new AuthenticatedChannelClientDriver(saslClientHandler, mChannelKey);
mAuthDriver = new AuthenticatedChannelClientDriver(
createSaslClientHandler(mChannelKey.getServerAddress(), authScheme, mParentSubject),
mChannelKey);

// Initialize client-server authentication drivers.
SaslAuthenticationServiceGrpc.SaslAuthenticationServiceStub serverStub =
Expand All @@ -108,13 +107,14 @@ public void authenticate() throws AlluxioStatusException {
// Intercept authenticated channel with channel-id injector.
mAuthenticatedChannel = ClientInterceptors.intercept(mManagedChannel,
new ChannelIdInjector(mChannelKey.getChannelId()));
} catch (AlluxioStatusException e) {
} catch (Throwable t) {
AlluxioStatusException e = AlluxioStatusException.fromThrowable(t);
// Build a pretty message for authentication failure.
String message = String.format(
"Channel authentication failed with code:%s. ChannelKey: %s, AuthType: %s, Error: %s",
"Channel authentication failed with code:%s. Channel: %s, AuthType: %s, Error: %s",
e.getStatusCode().name(), mChannelKey.toStringShort(), mAuthType, e.toString());
throw AlluxioStatusException
.from(Status.fromCode(e.getStatusCode()).withDescription(message).withCause(e));
.from(Status.fromCode(e.getStatusCode()).withDescription(message).withCause(t));
}
}

Expand Down
Expand Up @@ -51,8 +51,8 @@ public CompletableFuture<Void> close() {
super.close().whenComplete((result, error) -> {
try {
mChannel.shutdown();
} catch (Exception exc) {
LOG.warn("Failed to close underlying gRPC channel.{}", mChannel);
} catch (Exception e) {
LOG.warn("Failed to close channel: {}. Error: {}", mChannel.toStringShort(), e);
} finally {
resultFuture.complete(null);
}
Expand Down

0 comments on commit 0077608

Please sign in to comment.