Skip to content

Commit

Permalink
ARTEMIS-4540 validate MQTT session state data
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Jan 23, 2024
1 parent 019fc86 commit 44ceeff
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public String toString() {
StringBuilder str = new StringBuilder(TransportConfiguration.class.getSimpleName());
str.append("(name=" + name + ", ");
str.append("factory=" + (factoryClassName == null ? "null" : replaceWildcardChars(factoryClassName)));
str.append(") ");
str.append(")");
str.append(toStringParameters(params, extraProps));
return str.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,13 @@ public interface MQTTLogger {

@LogMessage(id = 834010, value = "Unable to scan MQTT sessions", level = LogMessage.Level.ERROR)
void unableToScanSessions(Exception e);

@LogMessage(id = 834011, value = "Invalid MQTT session state message. Type is incorrect: {}. Will not load this state into memory.", level = LogMessage.Level.WARN)
void sessionStateMessageIncorrectType(String type);

@LogMessage(id = 834012, value = "Invalid MQTT session state message. Client ID is empty or null. Will not load this state into memory.", level = LogMessage.Level.WARN)
void sessionStateMessageBadClientId();

@LogMessage(id = 834013, value = "Invalid MQTT session state message. Will not load this state into memory.", level = LogMessage.Level.WARN)
void errorDeserializingStateMessage(Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,23 @@ private MQTTStateManager(ActiveMQServer server) throws Exception {
// load session data from queue
try (LinkedListIterator<MessageReference> iterator = sessionStore.browserIterator()) {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage());
Message message = iterator.next().getMessage();
if (!(message instanceof CoreMessage)) {
MQTTLogger.LOGGER.sessionStateMessageIncorrectType(message.getClass().getName().toString());
continue;
}
String clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
if (clientId == null || clientId.length() == 0) {
MQTTLogger.LOGGER.sessionStateMessageBadClientId();
continue;
}
MQTTSessionState sessionState;
try {
sessionState = new MQTTSessionState((CoreMessage) message);
} catch (Exception e) {
MQTTLogger.LOGGER.errorDeserializingStateMessage(e);
continue;
}
sessionStates.put(clientId, sessionState);
}
} catch (NoSuchElementException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public Acceptor createAcceptor(TransportConfiguration info) {
managementService.registerAcceptor(acceptor, info);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(info.getFactoryClassName(), e);
ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(info.getName(), e);
}

return acceptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ void slowConsumerDetected(String sessionID,
@LogMessage(id = 222079, value = "The following keys are invalid for configuring the acceptor: {} the acceptor will not be started.", level = LogMessage.Level.WARN)
void invalidAcceptorKeys(String s);

@LogMessage(id = 222080, value = "Error instantiating remoting acceptor {}", level = LogMessage.Level.WARN)
void errorCreatingAcceptor(String factoryClassName, Exception e);
@LogMessage(id = 222080, value = "Error creating acceptor: {}", level = LogMessage.Level.WARN)
void errorCreatingAcceptor(String name, Exception e);

@LogMessage(id = 222081, value = "Timed out waiting for remoting thread pool to terminate", level = LogMessage.Level.WARN)
void timeoutRemotingThreadPool();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt5;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTStateManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;

public class MQTTStateManagerTest extends MQTT5TestSupport {

@Test(timeout = DEFAULT_TIMEOUT)
public void testNullClientID() throws Exception {
testBadStateMessage(null);
}

@Test(timeout = DEFAULT_TIMEOUT)
public void testEmptyClientID() throws Exception {
testBadStateMessage("");
}

@Test(timeout = DEFAULT_TIMEOUT)
public void testEmptyStateMessage() throws Exception {
testBadStateMessage(RandomUtil.randomString());
}

private void testBadStateMessage(String clientId) throws Exception {
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocator("vm://0"));
ClientSessionFactory csf = locator.createSessionFactory();
ClientSession s = addClientSession(csf.createSession());
ClientProducer p = addClientProducer(s.createProducer(MQTTUtil.MQTT_SESSION_STORE));
ClientMessage m = s.createMessage(true);
if (clientId != null) {
m.putStringProperty(Message.HDR_LAST_VALUE_NAME, clientId);
}
p.send(m);
s.close();
server.stop();
server.start();
assertTrue(server.waitForActivation(3, TimeUnit.SECONDS));
assertNotNull(MQTTStateManager.getInstance(server));
}

@Test
public void testWrongStateMessageType() throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
Connection c = factory.createConnection();
Session s = c.createSession();
MessageProducer p = s.createProducer(s.createQueue(MQTTUtil.MQTT_SESSION_STORE));
javax.jms.Message m = s.createMessage();
m.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), RandomUtil.randomString());
p.send(m);
c.close();
server.stop();
server.start();
assertTrue(server.waitForActivation(3, TimeUnit.SECONDS));
assertNotNull(MQTTStateManager.getInstance(server));
}
}

0 comments on commit 44ceeff

Please sign in to comment.