Skip to content

Commit

Permalink
Merge branch 'STORM-3027-1.x' of https://github.com/revans2/incubator…
Browse files Browse the repository at this point in the history
…-storm into 1.x-branch

STORM-3027: Make impersonation optional
  • Loading branch information
Robert Evans committed Apr 11, 2018
1 parent 2f15048 commit 58f7aef
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public abstract class AbstractSaslServerCallbackHandler implements CallbackHandl
private static final Logger LOG = LoggerFactory.getLogger(AbstractSaslServerCallbackHandler.class);
protected final Map<String,String> credentials = new HashMap<>();
protected String userName;
protected final boolean impersonationAllowed;

protected AbstractSaslServerCallbackHandler(boolean impersonationAllowed) {
this.impersonationAllowed = impersonationAllowed;
}

public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
Expand Down Expand Up @@ -82,6 +87,9 @@ private void handleAuthorizeCallback(AuthorizeCallback ac) {
//When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
//add the authNid as the real user in reqContext's subject which will be used during authorization.
if(!authenticationID.equals(ac.getAuthorizationID())) {
if (!impersonationAllowed) {
throw new IllegalArgumentException("Impersonation is not allowed for this server");
}
LOG.info("Impersonation attempt authenticationID = {} authorizationID = {}",
ac.getAuthenticationID(), ac.getAuthorizationID());
ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void prepare(ThriftConnectionType type, Map storm_conf, Configuration log
public TServer getServer(TProcessor processor) throws IOException, TTransportException {
int port = type.getPort(storm_conf);
Integer socketTimeout = type.getSocketTimeOut(storm_conf);
TTransportFactory serverTransportFactory = getServerTransportFactory();
TTransportFactory serverTransportFactory = getServerTransportFactory(type.isImpersonationAllowed());
TServerSocket serverTransport = null;
if (socketTimeout != null) {
serverTransport = new TServerSocket(port, socketTimeout);
Expand Down Expand Up @@ -96,10 +96,11 @@ public TServer getServer(TProcessor processor) throws IOException, TTransportExc

/**
* All subclass must implement this method
* @param impersonationAllowed true if SASL impersonation should be allowed, else false.
* @return server transport factory
* @throws IOException
*/
protected abstract TTransportFactory getServerTransportFactory() throws IOException;
protected abstract TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,30 @@
*/
public enum ThriftConnectionType {
NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, Config.NIMBUS_QUEUE_SIZE,
Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Config.STORM_THRIFT_SOCKET_TIMEOUT_MS),
Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Config.STORM_THRIFT_SOCKET_TIMEOUT_MS, true),
DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, false),
DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null,
Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null);
Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, false);

private final String _transConf;
private final String _portConf;
private final String _qConf;
private final String _threadsConf;
private final String _buffConf;
private final String _socketTimeoutConf;
private final boolean impersonationAllowed;

ThriftConnectionType(String transConf, String portConf, String qConf,
String threadsConf, String buffConf, String socketTimeoutConf) {
String threadsConf, String buffConf, String socketTimeoutConf,
boolean impersonationAllowed) {
_transConf = transConf;
_portConf = portConf;
_qConf = qConf;
_threadsConf = threadsConf;
_buffConf = buffConf;
_socketTimeoutConf = socketTimeoutConf;
this.impersonationAllowed = impersonationAllowed;
}

public String getTransportPlugin(Map conf) {
Expand Down Expand Up @@ -89,4 +92,12 @@ public Integer getSocketTimeOut(Map conf) {
}
return Utils.getInt(conf.get(_socketTimeoutConf));
}

/**
* Check if SASL impersonation is allowed for this transport type.
* @return true if it is else false.
*/
public boolean isImpersonationAllowed() {
return impersonationAllowed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
public static final String DIGEST = "DIGEST-MD5";
private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);

protected TTransportFactory getServerTransportFactory() throws IOException {
protected TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
//create an authentication callback handler
CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf);
CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf, impersonationAllowed);

//create a transport factory that will invoke our auth callback for digest
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.security.auth.digest;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.SaslTransportPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;

import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
import org.apache.storm.security.auth.AuthUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SASL server side callback handler
Expand All @@ -47,7 +36,8 @@ public class ServerCallbackHandler extends AbstractSaslServerCallbackHandler {
private static final String USER_PREFIX = "user_";
public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";

public ServerCallbackHandler(Configuration configuration) throws IOException {
public ServerCallbackHandler(Configuration configuration, boolean impersonationAllowed) throws IOException {
super(impersonationAllowed);
if (configuration==null) return;

AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import javax.security.sasl.Sasl;
Expand All @@ -51,9 +50,10 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
public static final String KERBEROS = "GSSAPI";
private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);

public TTransportFactory getServerTransportFactory() throws IOException {
@Override
public TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
//create an authentication callback handler
CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, storm_conf);
CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, storm_conf, impersonationAllowed);

//login our principal
Subject subject = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.Subject;
import javax.security.auth.callback.*;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
Expand All @@ -39,8 +38,10 @@ public class ServerCallbackHandler implements CallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);

private String userName;
private final boolean impersonationAllowed;

public ServerCallbackHandler(Configuration configuration, Map stormConf) throws IOException {
public ServerCallbackHandler(Configuration configuration, Map stormConf, boolean impersonationAllowed) throws IOException {
this.impersonationAllowed = impersonationAllowed;
if (configuration==null) return;

AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
Expand All @@ -52,7 +53,7 @@ public ServerCallbackHandler(Configuration configuration, Map stormConf) throws

}

public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
public void handle(Callback[] callbacks) {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
handleNameCallback((NameCallback) callback);
Expand Down Expand Up @@ -86,6 +87,10 @@ private void handleAuthorizeCallback(AuthorizeCallback ac) {
//When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
//add the authNid as the real user in reqContext's subject which will be used during authorization.
if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
if (!impersonationAllowed) {
throw new IllegalArgumentException(ac.getAuthenticationID() + " attempting to impersonate " + ac.getAuthorizationID()
+ ". This is not allowed by this server");
}
ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
} else {
ReqContext.context().setRealPrincipal(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
private static final Logger LOG = LoggerFactory.getLogger(PlainSaslTransportPlugin.class);

@Override
protected TTransportFactory getServerTransportFactory() throws IOException {
protected TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
//create an authentication callback handler
CallbackHandler serverCallbackHandler = new PlainServerCallbackHandler();
CallbackHandler serverCallbackHandler = new PlainServerCallbackHandler(impersonationAllowed);
if (Security.getProvider(SaslPlainServer.SecurityProvider.SASL_PLAIN_SERVER) == null) {
Security.addProvider(new SaslPlainServer.SecurityProvider());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.security.auth.plain;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
package org.apache.storm.security.auth.plain;

import javax.security.auth.callback.PasswordCallback;
import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.SaslTransportPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;

/**
* SASL server side callback handler
*/
public class PlainServerCallbackHandler extends AbstractSaslServerCallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(PlainServerCallbackHandler.class);
public static final String PASSWORD = "password";

public PlainServerCallbackHandler() throws IOException {
public PlainServerCallbackHandler(boolean impersonationAllowed) {
super(impersonationAllowed);
userName=null;
}

Expand Down

0 comments on commit 58f7aef

Please sign in to comment.