Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -807,17 +807,18 @@ public Object run() throws IOException, InterruptedException {
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
UserGroupInformation ticket = remoteId.getTicket();
if (ticket != null) {
final UserGroupInformation realUser = ticket.getRealUser();
if (realUser != null) {
ticket = realUser;
}
}
try {
if (socket != null || shouldCloseConnection.get()) {
setFallBackToSimpleAuth(fallbackToSimpleAuth);
return;
}
UserGroupInformation ticket = remoteId.getTicket();
if (ticket != null) {
final UserGroupInformation realUser = ticket.getRealUser();
if (realUser != null) {
ticket = realUser;
}
}
connectingThread.set(Thread.currentThread());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
Expand Down Expand Up @@ -863,20 +864,8 @@ public AuthMethod run()
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(false);
}
} else if (UserGroupInformation.isSecurityEnabled()) {
if (!fallbackAllowed) {
throw new AccessControlException(
"Server asks us to fall back to SIMPLE " +
"auth, but this client is configured to only allow secure " +
"connections.");
}
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(true);
}
}
setFallBackToSimpleAuth(fallbackToSimpleAuth);
}

if (doPing) {
Expand Down Expand Up @@ -909,7 +898,41 @@ public AuthMethod run()
connectingThread.set(null);
}
}


private void setFallBackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth)
throws AccessControlException {
if (authMethod == null || authProtocol != AuthProtocol.SASL) {
if (authProtocol == AuthProtocol.SASL) {
LOG.trace("Auth method is not set, yield from setting auth fallback.");
}
return;
}
if (fallbackToSimpleAuth == null) {
// this should happen only during testing.
LOG.trace("Connection {} will skip to set fallbackToSimpleAuth as it is null.", remoteId);
} else {
if (fallbackToSimpleAuth.get()) {
// we already set the value to true, we do not need to examine again.
return;
}
}
if (authMethod != AuthMethod.SIMPLE) {
if (fallbackToSimpleAuth != null) {
LOG.trace("Disabling fallbackToSimpleAuth, target does not use SIMPLE authentication.");
fallbackToSimpleAuth.set(false);
}
} else if (UserGroupInformation.isSecurityEnabled()) {
if (!fallbackAllowed) {
throw new AccessControlException("Server asks us to fall back to SIMPLE auth, but this "
+ "client is configured to only allow secure connections.");
}
if (fallbackToSimpleAuth != null) {
LOG.trace("Enabling fallbackToSimpleAuth for target, as we are allowed to fall back.");
fallbackToSimpleAuth.set(true);
}
}
}

private void closeConnection() {
if (socket == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -124,18 +125,19 @@ protected static RPC.Server setupTestServer(
return server;
}

protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf)
protected static TestRpcService getClient(InetSocketAddress serverAddr, Configuration clientConf)
throws ServiceException {
try {
return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
} catch (IOException e) {
throw new ServiceException(e);
}
return getClient(serverAddr, clientConf, null);
}

protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf, RetryPolicy connectionRetryPolicy) throws ServiceException {
return getClient(serverAddr, clientConf, connectionRetryPolicy, null);
}

protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf, final RetryPolicy connectionRetryPolicy)
Configuration clientConf, final RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws ServiceException {
try {
return RPC.getProtocolProxy(
Expand All @@ -146,7 +148,7 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr,
clientConf,
NetUtils.getDefaultSocketFactory(clientConf),
RPC.getRpcTimeout(clientConf),
connectionRetryPolicy, null).getProxy();
connectionRetryPolicy, fallbackToSimpleAuth).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down
Loading