Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-607 Added interceptor support for MQTT protocol. #617

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.BaseInterceptor;

public interface MQTTInterceptor extends BaseInterceptor<MqttMessage> {
}
Expand Up @@ -53,6 +53,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
private MQTTSession session;

private ActiveMQServer server;
private MQTTProtocolManager protocolManager;

// This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
private ChannelHandlerContext ctx;
Expand All @@ -61,8 +62,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {

private boolean stopped = false;

public MQTTProtocolHandler(ActiveMQServer server) {
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
this.server = server;
this.protocolManager = protocolManager;
}

void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
Expand Down Expand Up @@ -188,6 +190,7 @@ void handleConnack(MqttConnAckMessage message) {
}

void handlePublish(MqttPublishMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
}

Expand Down Expand Up @@ -281,6 +284,7 @@ protected int send(int messageId, String topicName, int qosLevel, ByteBuf payloa
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
this.protocolManager.invokeOutgoing(publish, connection);

ctx.write(publish);
ctx.flush();
Expand Down
Expand Up @@ -16,35 +16,42 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;

import java.util.List;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;

import java.util.ArrayList;
import java.util.List;

/**
* MQTTProtocolManager
*/
class MQTTProtocolManager implements ProtocolManager, NotificationListener {
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
implements NotificationListener {

private ActiveMQServer server;

private MQTTLogger log = MQTTLogger.LOGGER;
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();

MQTTProtocolManager(ActiveMQServer server) {
MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
}

@Override
Expand All @@ -58,8 +65,12 @@ public ProtocolManagerFactory getFactory() {
}

@Override
public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) {
// TODO handle interceptors
public void updateInterceptors(List incoming, List outgoing) {
this.incomingInterceptors.clear();
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));

this.outgoingInterceptors.clear();
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}

@Override
Expand Down Expand Up @@ -100,7 +111,7 @@ public void addChannelHandlers(ChannelPipeline pipeline) {
pipeline.addLast(new MqttEncoder());
pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));

pipeline.addLast(new MQTTProtocolHandler(server));
pipeline.addLast(new MQTTProtocolHandler(server, this));
}

@Override
Expand All @@ -126,4 +137,12 @@ public MessageConverter getConverter() {
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}

public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
}

public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
}
Expand Up @@ -22,12 +22,13 @@

import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;

@Component(service = ProtocolManagerFactory.class)
public class MQTTProtocolManagerFactory implements ProtocolManagerFactory<BaseInterceptor> {
public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<MQTTInterceptor> {

public static final String MQTT_PROTOCOL_NAME = "MQTT";

Expand All @@ -40,13 +41,12 @@ public ProtocolManager createProtocolManager(ActiveMQServer server,
final Map<String, Object> parameters,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
return new MQTTProtocolManager(server);
return new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors);
}

@Override
public List filterInterceptors(List list) {
// TODO Add support for interceptors.
return null;
public List<MQTTInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
return internalFilterInterceptors(MQTTInterceptor.class, interceptors);
}

@Override
Expand Down
Expand Up @@ -37,9 +37,9 @@
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
Expand All @@ -52,7 +52,7 @@
/**
* StompProtocolManager
*/
class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
// Constants -----------------------------------------------------

// Attributes ----------------------------------------------------
Expand Down Expand Up @@ -410,21 +410,4 @@ public boolean destinationExists(String destination) {
public ActiveMQServer getServer() {
return server;
}

private void invokeInterceptors(List<StompFrameInterceptor> interceptors,
final StompFrame frame,
final StompConnection connection) {
if (interceptors != null && !interceptors.isEmpty()) {
for (StompFrameInterceptor interceptor : interceptors) {
try {
if (!interceptor.intercept(frame, connection)) {
break;
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.error(e);
}
}
}
}
}
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.activemq.artemis.spi.core.protocol;

import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;

import java.util.List;

public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection>
implements ProtocolManager<I> {

protected void invokeInterceptors(final List<I> interceptors,
final P message,
final C connection) {
if (interceptors != null && !interceptors.isEmpty()) {
for (I interceptor : interceptors) {
try {
if (!interceptor.intercept(message, connection)) {
break;
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.error(e);
}
}
}
}
}
2 changes: 1 addition & 1 deletion docs/hacking-guide/en/maintainers.md
Expand Up @@ -11,7 +11,7 @@ What does it mean to be reasonably confident? If the developer has run the same
builds are running they can be reasonably confident. Currently the [PR build](https://builds.apache.org/job/ActiveMQ-Artemis-PR-Build/)
runs this command:

mvn compile test-compile javadoc:javadoc -Pfast-tests -Pextra-tests test
mvn -Pfast-tests -Pextra-tests install

However, if the changes are significant, touches a wide area of code, or even if the developer just wants a second
opinion they are encouraged to engage other members of the community to obtain an additional review prior to pushing.
Expand Down
21 changes: 17 additions & 4 deletions docs/user-manual/en/intercepting-operations.md
Expand Up @@ -9,7 +9,9 @@ makes interceptors powerful, but also potentially dangerous.

## Implementing The Interceptors

An interceptor must implement the `Interceptor interface`:
All interceptors are protocol specific.

An interceptor for the core protocol must implement the interface `Interceptor`:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny thing. I'd myself change that to

implement the `Interceptor` interface

sounds more like standard Java-speak to me that way.


``` java
package org.apache.artemis.activemq.api.core.interceptor;
Expand All @@ -20,14 +22,25 @@ public interface Interceptor
}
```

For stomp protocol an interceptor must implement the `StompFrameInterceptor class`:
For stomp protocol an interceptor must implement the interface `StompFrameInterceptor`:

``` java
package org.apache.activemq.artemis.core.protocol.stomp;

public interface StompFrameInterceptor
public interface StompFrameInterceptor extends BaseInterceptor<StompFrame>
{
boolean intercept(StompFrame stompFrame, RemotingConnection connection);
}
```

Likewise for MQTT protocol, an interceptor must implement the interface `MQTTInterceptor`:

``` java
package org.apache.activemq.artemis.core.protocol.mqtt;

public interface MQTTInterceptor extends BaseInterceptor<MqttMessage>
{
public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
boolean intercept(MqttMessage mqttMessage, RemotingConnection connection);
}
```

Expand Down
Expand Up @@ -246,7 +246,9 @@ public void testSendAndReceiveAtLeastOnce() throws Exception {
}

@Test(timeout = 60 * 1000)
public void testSendAndReceiveExactlyOnce() throws Exception {
public void testSendAndReceiveExactlyOnceWithInterceptors() throws Exception {
MQTTIncomingInterceptor.clear();
MQTTOutoingInterceptor.clear();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);

Expand All @@ -263,6 +265,8 @@ public void testSendAndReceiveExactlyOnce() throws Exception {
}
subscriber.disconnect();
publisher.disconnect();
assertEquals(NUM_MESSAGES, MQTTIncomingInterceptor.getMessageCount());
assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
}

@Test(timeout = 60 * 1000)
Expand Down Expand Up @@ -380,7 +384,8 @@ public void testMQTTPathPatterns() throws Exception {
Message msg = connection.receive(5, TimeUnit.SECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
String msgPayload = new String(msg.getPayload());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One worrisome thing. I think there's some interdependence in these tests. I originally added my interceptor checks in a new test method. When I did that, this test started failing. It was receiving the messages created previously. I suspect the server isn't getting cleaned up 100% correctly, or maybe not at all.

assertTrue("RETAINED prefix " + wildcard + " msg " + msgPayload, msgPayload.startsWith(RETAINED));
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
Expand Down