Skip to content

Commit

Permalink
ARTEMIS-1897 use core session for STOMP authn
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Jun 4, 2018
1 parent 45ebe4e commit c1b0f1e
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 131 deletions.
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;

import javax.security.auth.Subject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -31,6 +32,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand All @@ -53,8 +55,6 @@
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;

import javax.security.auth.Subject;

import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;

public final class StompConnection implements RemotingConnection {
Expand Down Expand Up @@ -269,9 +269,8 @@ public void checkDestination(String destination) throws ActiveMQStompException {
}

public void autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
ServerSession session = getSession().getCoreSession();

try {
ServerSession session = getSession().getCoreSession();
SimpleString simpleQueue = SimpleString.toSimpleString(queue);
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
Expand Down Expand Up @@ -437,10 +436,18 @@ public String getLogin() {
return login;
}

public void setLogin(String login) {
this.login = login;
}

public String getPasscode() {
return passcode;
}

public void setPasscode(String passcode) {
this.passcode = passcode;
}

@Override
public void setClientID(String clientID) {
this.clientID = clientID;
Expand Down Expand Up @@ -584,31 +591,24 @@ public void sendFrame(StompFrame frame, StompPostReceiptFunction function) {
manager.sendReply(this, frame, function);
}

public boolean validateUser(final String login, final String pass, final RemotingConnection connection) {
this.valid = manager.validateUser(login, pass, connection);
if (valid) {
this.login = login;
this.passcode = pass;
}
return valid;
}

public CoreMessage createServerMessage() {
return manager.createServerMessage();
}

public StompSession getSession() throws ActiveMQStompException {
public StompSession getSession() throws ActiveMQStompException, ActiveMQSecurityException {
return getSession(null);
}

public StompSession getSession(String txID) throws ActiveMQStompException {
public StompSession getSession(String txID) throws ActiveMQStompException, ActiveMQSecurityException {
StompSession session = null;
try {
if (txID == null) {
session = manager.getSession(this);
} else {
session = manager.getTransactedSession(this, txID);
}
} catch (ActiveMQSecurityException e) {
throw e;
} catch (Exception e) {
throw BUNDLE.errorGetSession(e).setHandler(frameHandler);
}
Expand All @@ -623,15 +623,15 @@ protected void validate() throws ActiveMQStompException {
}

protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException {
StompSession stompSession = getSession(txID);

if (stompSession.isNoLocal()) {
message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
}
if (isEnableMessageID()) {
message.putStringProperty("amqMessageId", "STOMP" + message.getMessageID());
}
try {
StompSession stompSession = getSession(txID);

if (stompSession.isNoLocal()) {
message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
}
if (isEnableMessageID()) {
message.putStringProperty("amqMessageId", "STOMP" + message.getMessageID());
}
if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
stompSession.sendInternal(message, false);
} else {
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
Expand All @@ -45,9 +44,6 @@
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager2;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3;
import org.apache.activemq.artemis.utils.UUIDGenerator;

import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
Expand All @@ -65,7 +61,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St

private final Executor executor;

private final Map<String, StompSession> transactedSessions = new HashMap<>();
private final Map<Object, StompSession> transactedSessions = new HashMap<>();

// key => connection ID, value => Stomp session
private final Map<Object, StompSession> sessions = new HashMap<>();
Expand Down Expand Up @@ -212,33 +208,22 @@ public boolean send(final StompConnection connection, final StompFrame frame) {
}
}

// Package protected ---------------------------------------------

// Protected -----------------------------------------------------

// Private -------------------------------------------------------

public StompSession getSession(StompConnection connection) throws Exception {
StompSession stompSession = sessions.get(connection.getID());
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
stompSession.setServerSession(session);
sessions.put(connection.getID(), stompSession);
}
server.getStorageManager().setContext(stompSession.getContext());
return stompSession;
return internalGetSession(connection, sessions, connection.getID(), false);
}

public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
StompSession stompSession = transactedSessions.get(txID);
return internalGetSession(connection, transactedSessions, txID, true);
}

private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception {
StompSession stompSession = sessions.get(id);
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
stompSession.setServerSession(session);
transactedSessions.put(txID, stompSession);
sessions.put(id, stompSession);
}
server.getStorageManager().setContext(stompSession.getContext());
return stompSession;
Expand All @@ -263,9 +248,9 @@ public void run() {
}

// removed the transacted session belonging to the connection
Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, StompSession> entry = iterator.next();
Map.Entry<Object, StompSession> entry = iterator.next();
if (entry.getValue().getConnection() == connection) {
ServerSession serverSession = entry.getValue().getCoreSession();
try {
Expand Down Expand Up @@ -326,24 +311,6 @@ public String getVirtualHostName() {
return "activemq";
}

public boolean validateUser(String login, String passcode, RemotingConnection remotingConnection) {
boolean validated = true;

ActiveMQSecurityManager sm = server.getSecurityManager();

if (sm != null && server.getConfiguration().isSecurityEnabled()) {
if (sm instanceof ActiveMQSecurityManager3) {
validated = ((ActiveMQSecurityManager3) sm).validateUser(login, passcode, remotingConnection) != null;
} else if (sm instanceof ActiveMQSecurityManager2) {
validated = ((ActiveMQSecurityManager2) sm).validateUser(login, passcode, CertificateUtil.getCertsFromConnection(remotingConnection));
} else {
validated = sm.validateUser(login, passcode);
}
}

return validated;
}

public CoreMessage createServerMessage() {
return new CoreMessage(server.getStorageManager().generateID(), 512);
}
Expand Down
Expand Up @@ -133,6 +133,8 @@ private StompFrame handleSubscribe(StompFrame request) {
return null;
} catch (ActiveMQStompException e) {
return e.getFrame();
} catch (Exception e) {
return new ActiveMQStompException(e.getMessage(), e).setHandler(this).getFrame();
}

}
Expand Down Expand Up @@ -256,7 +258,7 @@ public StompFrame onAbort(StompFrame request) {
return response;
}

public StompPostReceiptFunction onSubscribe(StompFrame frame) throws ActiveMQStompException {
public StompPostReceiptFunction onSubscribe(StompFrame frame) throws Exception {
String destination = getDestination(frame);

String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
Expand All @@ -279,19 +281,19 @@ public StompPostReceiptFunction onSubscribe(StompFrame frame) throws ActiveMQSto
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
}

public String getDestination(StompFrame request) throws ActiveMQStompException {
public String getDestination(StompFrame request) throws Exception {
return getDestination(request, Headers.Subscribe.DESTINATION);
}

public String getDestination(StompFrame request, String header) throws ActiveMQStompException {
public String getDestination(StompFrame request, String header) throws Exception {
String destination = request.getHeader(header);
if (destination == null) {
return null;
}
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
}

public String getPrefix(StompFrame request) throws ActiveMQStompException {
public String getPrefix(StompFrame request) throws Exception {
String destination = request.getHeader(Headers.Send.DESTINATION);
if (destination == null) {
return null;
Expand Down Expand Up @@ -367,7 +369,7 @@ public void onError(ActiveMQStompException e) {
connection.destroy();
}

private RoutingType getRoutingType(String typeHeader, String destination) throws ActiveMQStompException {
private RoutingType getRoutingType(String typeHeader, String destination) throws Exception {
// null is valid to return here so we know when the user didn't provide any routing info
RoutingType routingType;
if (typeHeader != null) {
Expand All @@ -378,4 +380,14 @@ private RoutingType getRoutingType(String typeHeader, String destination) throws
return routingType;
}

protected StompFrame getFailedAuthenticationResponse(String login) {
StompFrame response;
response = createStompFrame(Stomp.Responses.ERROR);
response.setNeedsDisconnect(true);
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
response.setBody(responseText);
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
return response;
}

}
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
Expand Down Expand Up @@ -54,32 +55,25 @@ public StompFrame onConnect(StompFrame frame) {

try {
connection.setClientID(clientID);
if (connection.validateUser(login, passcode, connection)) {
connection.setValid(true);
connection.setLogin(login);
connection.setPasscode(passcode);
// Create session which will validate user - this will cache the session in the protocol manager
connection.getSession();
connection.setValid(true);

// Create session after validating user - this will cache the session in the
// protocol manager
connection.getSession();
response = new StompFrameV10(Stomp.Responses.CONNECTED);

response = new StompFrameV10(Stomp.Responses.CONNECTED);

if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString());
}
if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString());
}

response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());

if (requestID != null) {
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
} else {
// not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
response.setBody(responseText);
response.setNeedsDisconnect(true);
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
if (requestID != null) {
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
} catch (ActiveMQSecurityException e) {
response = getFailedAuthenticationResponse(login);
} catch (ActiveMQStompException e) {
response = e.getFrame();
}
Expand Down

0 comments on commit c1b0f1e

Please sign in to comment.