diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java index 867d50b43b..11e1f7aa8e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java @@ -200,6 +200,8 @@ public class Constants { public static final String GRPC = "GRPC"; + public static final String MQTT = "MQTT"; + public static final String OS_NAME_KEY = "os.name"; public static final String OS_WIN_PREFIX = "win"; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java index 8617888994..c9ed13ca81 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java @@ -19,6 +19,7 @@ import static org.apache.eventmesh.common.Constants.GRPC; import static org.apache.eventmesh.common.Constants.HTTP; +import static org.apache.eventmesh.common.Constants.MQTT; import static org.apache.eventmesh.common.Constants.TCP; import org.apache.eventmesh.common.config.CommonConfiguration; @@ -36,7 +37,7 @@ public class ConfigurationContextUtil { private static final ConcurrentHashMap CONFIGURATION_MAP = new ConcurrentHashMap<>(); - public static final List KEYS = Lists.newArrayList(HTTP, TCP, GRPC); + public static final List KEYS = Lists.newArrayList(HTTP, TCP, GRPC, MQTT); /** * Save http, tcp, grpc configuration at startup for global use. diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index cabe3f9bc5..2815c8124c 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -17,12 +17,13 @@ ########################## EventMesh Runtime Environment ########################## eventMesh.server.idc=DEFAULT eventMesh.server.env=PRD -eventMesh.server.provide.protocols=HTTP,TCP,GRPC +eventMesh.server.provide.protocols=HTTP,TCP,GRPC,MQTT eventMesh.server.cluster=COMMON eventMesh.server.name=EVENTMESH-runtime eventMesh.sysid=0000 eventMesh.server.http.port=10105 eventMesh.server.grpc.port=10205 +eventMesh.server.mqtt.port=10305 ########################## EventMesh TCP Configuration ########################## eventMesh.server.tcp.enabled=true eventMesh.server.tcp.port=10000 @@ -30,6 +31,10 @@ eventMesh.server.tcp.readerIdleSeconds=120 eventMesh.server.tcp.writerIdleSeconds=120 eventMesh.server.tcp.allIdleSeconds=120 eventMesh.server.tcp.clientMaxNum=10000 +########################## EventMesh Mtqq Configuration ########################## +eventMesh.server.mqtt.password=false + + # client isolation time if the message send failure eventMesh.server.tcp.pushFailIsolateTimeInMills=30000 # rebalance internal @@ -144,3 +149,4 @@ eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook eventMesh.webHook.nacosMode.serverAddr=127.0.0.1:8848 # Webhook CloudEvent sending mode. This property is the same as the eventMesh.storage.plugin.type configuration. eventMesh.webHook.producer.storage=standalone + diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java index 94061e5027..2c41436a27 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java @@ -236,4 +236,7 @@ private AclProperties buildTcpAclProperties(String remoteAddr, String token, Str return aclProperties; } + public boolean checkValid(String username, String password) { + return aclService.doAclCheckUserNameAndPassword(username, password); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java new file mode 100644 index 0000000000..cf603a1424 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + + +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.runtime.acl.Acl; +import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientConnectProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientDisConnectProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.HealthCheckProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.MqttProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.PublishProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.SubscrubeProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.UnSubscrubeProcessor; +import org.apache.eventmesh.runtime.meta.MetaStorage; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageFactory; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EventMeshMQTTServer extends AbstractRemotingServer { + + private final EventMeshMQTTConfiguration eventMeshMQTTConfiguration; + + private final EventMeshServer eventMeshServer; + + private final MetaStorage metaStorage; + + private final Acl acl; + + + protected final Map processorTable = + new ConcurrentHashMap<>(64); + + private final AtomicBoolean started = new AtomicBoolean(false); + + + public EventMeshMQTTServer(final EventMeshServer eventMeshServer, final EventMeshMQTTConfiguration eventMeshMQTTConfiguration) { + this.eventMeshServer = eventMeshServer; + this.eventMeshMQTTConfiguration = eventMeshMQTTConfiguration; + this.metaStorage = eventMeshServer.getMetaStorage(); + this.acl = eventMeshServer.getAcl(); + } + + @Override + public void init() throws Exception { + log.info("==================EventMeshMQTTServer Initialing=================="); + super.init("eventMesh-mqtt"); + registerMQTTProcessor(); + + } + + private void registerMQTTProcessor() { + processorTable.putIfAbsent(MqttMessageType.CONNECT, new ClientConnectProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.DISCONNECT, new ClientDisConnectProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.PINGREQ, new HealthCheckProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.SUBSCRIBE, new SubscrubeProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.UNSUBSCRIBE, new UnSubscrubeProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.PUBLISH, new PublishProcessor(this, getWorkerGroup())); + } + + + @Override + public void start() throws Exception { + Thread thread = new Thread(() -> { + final ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(this.getBossGroup(), this.getIoGroup()) + .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class); + bootstrap.option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_RCVBUF, 10485760); + + bootstrap.childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childHandler(new MQTTServerInitializer()); + + try { + int port = eventMeshMQTTConfiguration.getEventMeshTcpServerPort(); + ChannelFuture f = bootstrap.bind(port).sync(); + log.info("EventMeshMQTTServer[port={}] started.....", port); + f.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("EventMeshMQTTServer RemotingServer Start Err!", e); + try { + shutdown(); + } catch (Exception ex) { + log.error("EventMeshMQTTServer RemotingServer shutdown Err!", ex); + } + System.exit(-1); + } + }, "eventMesh-mqtt-server"); + thread.start(); + + started.compareAndSet(false, true); + + } + + @Override + public CommonConfiguration getConfiguration() { + return eventMeshMQTTConfiguration; + } + + private class MQTTServerInitializer extends ChannelInitializer { + + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline channelPipeline = ch.pipeline(); + channelPipeline.addLast(getWorkerGroup(), MqttEncoder.INSTANCE); + channelPipeline.addLast(getWorkerGroup(), new MqttDecoder()); + channelPipeline.addLast(getWorkerGroup(), new EventMeshMqttChannelInboundHandler()); + } + } + + @Sharable + private class EventMeshMqttChannelInboundHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof MqttMessage) { + MqttMessage mqttMessage = (MqttMessage) msg; + if (mqttMessage.decoderResult().isFailure()) { + Throwable cause = mqttMessage.decoderResult().cause(); + if (cause instanceof MqttUnacceptableProtocolVersionException) { + ctx.writeAndFlush(MqttMessageFactory.newMessage( + new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), + new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), + null)); + } else if (cause instanceof MqttIdentifierRejectedException) { + ctx.writeAndFlush(MqttMessageFactory.newMessage( + new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), + new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), + null)); + } + ctx.close(); + return; + } + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + MqttMessageType mqttMessageType = mqttFixedHeader.messageType(); + MqttProcessor mqttProcessor = processorTable.get(mqttMessageType); + if (!Objects.isNull(mqttProcessor)) { + Executor executor = mqttProcessor.executor(); + if (Objects.isNull(executor)) { + mqttProcessor.process(ctx, mqttMessage); + } else { + executor.execute(() -> { + try { + mqttProcessor.process(ctx, mqttMessage); + } catch (MqttException e) { + log.error("[mqtt Processor error]", e); + ctx.close(); + } + }); + } + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (cause instanceof IOException) { + ctx.close(); + } else { + super.exceptionCaught(ctx, cause); + } + } + } + + public Acl getAcl() { + return acl; + } +} \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java new file mode 100644 index 0000000000..28a2169c76 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import static org.apache.eventmesh.common.Constants.MQTT; + +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.utils.ConfigurationContextUtil; +import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration; + +public class EventMeshMqttBootstrap implements EventMeshBootstrap { + + private EventMeshMQTTServer eventMeshMQTTServer; + + private EventMeshServer eventMeshServer; + + private EventMeshMQTTConfiguration eventMeshMQTTConfiguration; + + public EventMeshMqttBootstrap(EventMeshServer eventMeshServer) { + this.eventMeshServer = eventMeshServer; + ConfigService configService = ConfigService.getInstance(); + this.eventMeshMQTTConfiguration = configService.buildConfigInstance(EventMeshMQTTConfiguration.class); + ConfigurationContextUtil.putIfAbsent(MQTT, eventMeshMQTTConfiguration); + } + + @Override + public void init() throws Exception { + if (eventMeshMQTTConfiguration != null) { + eventMeshMQTTServer = new EventMeshMQTTServer(eventMeshServer, eventMeshMQTTConfiguration); + eventMeshMQTTServer.init(); + } + + } + + @Override + public void start() throws Exception { + if (eventMeshMQTTServer != null) { + eventMeshMQTTServer.start(); + } + } + + @Override + public void shutdown() throws Exception { + // server shutdown + if (eventMeshMQTTServer != null) { + eventMeshMQTTServer.shutdown(); + } + } + + public EventMeshMQTTServer getEventMeshMQTTServer() { + return eventMeshMQTTServer; + } + + public EventMeshServer getEventMeshServer() { + return eventMeshServer; + } + + public EventMeshMQTTConfiguration getEventMeshMQTTConfiguration() { + return eventMeshMQTTConfiguration; + } + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index 26e727406b..3cb5b29eb7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -19,6 +19,7 @@ import static org.apache.eventmesh.common.Constants.GRPC; import static org.apache.eventmesh.common.Constants.HTTP; +import static org.apache.eventmesh.common.Constants.MQTT; import static org.apache.eventmesh.common.Constants.TCP; import org.apache.eventmesh.common.config.CommonConfiguration; @@ -72,6 +73,8 @@ public class EventMeshServer { private EventMeshHTTPServer eventMeshHTTPServer = null; + private EventMeshMQTTServer eventMeshMQTTServer = null; + public EventMeshServer() { // Initialize configuration @@ -97,6 +100,9 @@ public EventMeshServer() { case GRPC: BOOTSTRAP_LIST.add(new EventMeshGrpcBootstrap(this)); break; + case MQTT: + BOOTSTRAP_LIST.add(new EventMeshMqttBootstrap(this)); + break; default: // nothing to do } @@ -132,6 +138,9 @@ public void init() throws Exception { if (eventMeshBootstrap instanceof EventMeshGrpcBootstrap) { eventMeshGrpcServer = ((EventMeshGrpcBootstrap) eventMeshBootstrap).getEventMeshGrpcServer(); } + if (eventMeshBootstrap instanceof EventMeshMQTTServer) { + eventMeshMQTTServer = ((EventMeshMqttBootstrap) eventMeshBootstrap).getEventMeshMQTTServer(); + } } if (Objects.nonNull(eventMeshTCPServer) && Objects.nonNull(eventMeshHTTPServer) && Objects.nonNull(eventMeshGrpcServer)) { @@ -240,4 +249,8 @@ public EventMeshGrpcServer getEventMeshGrpcServer() { public EventMeshHTTPServer getEventMeshHTTPServer() { return eventMeshHTTPServer; } + + public EventMeshMQTTServer getEventMeshMQTTServer() { + return eventMeshMQTTServer; + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshMQTTConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshMQTTConfiguration.java new file mode 100644 index 0000000000..6540b26433 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshMQTTConfiguration.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.configuration; + +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigFiled; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@Data +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +@Config(prefix = "eventMesh.server") +public class EventMeshMQTTConfiguration extends CommonConfiguration { + + @ConfigFiled(field = "mqtt.port", notNull = true, beNumber = true) + private int mqttServerPort = 10305; + + @ConfigFiled(field = "mqtt.password") + private boolean mqttPasswordMust = Boolean.FALSE; + + public int getEventMeshTcpServerPort() { + return mqttServerPort; + } + + public boolean isMqttPasswordMust() { + return mqttPasswordMust; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/client/ClientManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/client/ClientManager.java new file mode 100644 index 0000000000..bf27c18b4c --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/client/ClientManager.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.client; + +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; +import org.apache.eventmesh.runtime.util.RemotingHelper; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessage; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ClientManager { + + private final Map clientTable = new ConcurrentHashMap<>(); + + private final DelayQueue delayQueue = new DelayQueue<>(); + + private Thread cleanThread; + + private ReentrantLock lock = new ReentrantLock(); + + AtomicBoolean started = new AtomicBoolean(false); + + private ClientManager() { + this.cleanThread = new Thread(() -> { + while (true && !Thread.currentThread().isInterrupted()) { + try { + ClientInfo clientInfo = delayQueue.take(); + if (!Objects.isNull(clientInfo)) { + clientTable.remove(clientInfo.getRemoteAddress()); + log.info("cleanup mqtt client {}", clientInfo.getRemoteAddress()); + } + } catch (InterruptedException e) { + log.error("cleanup timeout client InterruptedException", e); + } + } + }); + cleanThread.setName("Mtqq-Client-Manager-Thread"); + cleanThread.setDaemon(true); + } + + public Set search(String topic) { + Set clientInfoSet = new HashSet<>(); + clientTable.forEach((addr, clientInfo) -> { + Collection subscriptionItems = clientInfo.getSubscriptionItems(); + for (TopicAndQos subscriptionItem : subscriptionItems) { + String subTopic = subscriptionItem.getTopic(); + if (topic.equals(subTopic)) { + clientInfoSet.add(clientInfo); + break; + } + if (topic.split("/").length >= subTopic.split("/").length) { + List splitTopics = Arrays.stream(topic.split("/")).collect(Collectors.toList()); + List spliteTopicFilters = Arrays.stream(subTopic.split("/")).collect(Collectors.toList()); + String newTopicFilter = ""; + for (int i = 0; i < spliteTopicFilters.size(); i++) { + String value = spliteTopicFilters.get(i); + if (value.equals("+")) { + newTopicFilter = newTopicFilter + "+/"; + } else if (value.equals("#")) { + newTopicFilter = newTopicFilter + "#/"; + break; + } else { + newTopicFilter = newTopicFilter + splitTopics.get(i) + "/"; + } + } + if (newTopicFilter.endsWith("/")) { + newTopicFilter = newTopicFilter.substring(0, newTopicFilter.length() - 1); + } + + if (subTopic.equals(newTopicFilter)) { + clientInfoSet.add(clientInfo); + } + + } + + } + }); + + return clientInfoSet; + } + + public static ClientManager getInstance() { + return ClientManagerHolder.clientManagerProvider; + } + + public ClientInfo getOrRegisterClient(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + ClientInfo temp = ClientInfo.build(ctx, mqttMessage); + String remoteAddress = temp.getRemoteAddress(); + ClientInfo clientInfo = clientTable.computeIfAbsent(remoteAddress, (addressInner) -> ClientInfo.build(ctx, mqttMessage)); + clientInfo.refresh(); + lock.lock(); + try { + delayQueue.remove(clientInfo); + delayQueue.add(clientInfo); + } finally { + lock.unlock(); + } + if (started.compareAndSet(false, true)) { + cleanThread.start(); + } + + return clientInfo; + } + + + public void unregisterClient(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + ClientInfo temp = ClientInfo.build(ctx, mqttMessage); + String remoteAddress = temp.getRemoteAddress(); + clientTable.remove(remoteAddress); + delayQueue.remove(temp); + + } + + private static class ClientManagerHolder { + + static ClientManager clientManagerProvider = new ClientManager(); + } + + public static class TopicAndQos implements Serializable { + + + private String topic; + + private Integer qos; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicAndQos that = (TopicAndQos) o; + return Objects.equals(topic, that.topic) && Objects.equals(qos, that.qos); + } + + @Override + public int hashCode() { + return Objects.hash(topic, qos); + } + + public TopicAndQos(String topic, Integer qos) { + this.topic = topic; + this.qos = qos; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getQos() { + return qos; + } + + public void setQos(Integer qos) { + this.qos = qos; + } + } + + + public static class ClientInfo implements Delayed { + + Channel channel; + + String remoteAddress; + + private Date lastUpTime = new Date(); + + Collection subscriptionItems = Collections.synchronizedCollection(new HashSet<>()); + + int keepLiveTime; + + private long activeTime; + + public void setLastUpTime(Date lastUpTime) { + this.lastUpTime = lastUpTime; + } + + public void subscribe(String topic, int qos) { + subscriptionItems.add(new TopicAndQos(topic, qos)); + } + + public void subscribes(Collection topics) { + subscriptionItems.addAll(topics); + } + + public void unsubscribes(Collection topics) { + subscriptionItems.removeAll(topics); + } + + public Collection getSubscriptionItems() { + return Collections.unmodifiableCollection(subscriptionItems); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientInfo that = (ClientInfo) o; + return Objects.equals(channel, that.channel) && Objects.equals(remoteAddress, that.remoteAddress); + } + + @Override + public int hashCode() { + return Objects.hash(channel, remoteAddress); + } + + public String getRemoteAddress() { + return remoteAddress; + } + + public void setRemoteAddress(String remoteAddress) { + this.remoteAddress = remoteAddress; + } + + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + + public int getKeepLiveTime() { + return keepLiveTime; + } + + public void setKeepLiveTime(int keepLiveTime) { + this.keepLiveTime = keepLiveTime; + } + + public void setActiveTime(long activeTime) { + this.activeTime = activeTime; + } + + static ClientInfo build(ChannelHandlerContext ctx, MqttMessage mqttMessage) { + ClientInfo clientInfo = new ClientInfo(); + clientInfo.setChannel(ctx.channel()); + Object header = mqttMessage.variableHeader(); + if (header instanceof MqttConnectVariableHeader) { + int keepliveTime = ((MqttConnectVariableHeader) header).keepAliveTimeSeconds(); + clientInfo.setKeepLiveTime(keepliveTime); + clientInfo.setActiveTime(System.currentTimeMillis() + (1000 * keepliveTime * 2)); + } + clientInfo.setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + return clientInfo; + } + + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(activeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } + + public void refresh() { + setLastUpTime(new Date()); + setActiveTime(System.currentTimeMillis() + (1000 * getKeepLiveTime() * 2)); + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/exception/MqttException.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/exception/MqttException.java new file mode 100644 index 0000000000..8b87ad8013 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/exception/MqttException.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.exception; + +import org.apache.commons.lang3.StringUtils; + +public class MqttException extends Exception { + + /** + * serialVersionUID. + */ + private static final long serialVersionUID = -3813902031489276776L; + + private int errCode; + + private String errMsg; + + private Throwable causeThrowable; + + public MqttException() { + } + + public MqttException(final int errCode, final String errMsg) { + super(errMsg); + this.errCode = errCode; + this.errMsg = errMsg; + } + + public MqttException(final int errCode, final Throwable throwable) { + super(throwable); + this.errCode = errCode; + this.setCauseThrowable(throwable); + } + + public MqttException(final int errCode, final String errMsg, final Throwable throwable) { + super(errMsg, throwable); + this.errCode = errCode; + this.errMsg = errMsg; + this.setCauseThrowable(throwable); + } + + public int getErrCode() { + return this.errCode; + } + + public String getErrMsg() { + if (!StringUtils.isBlank(this.errMsg)) { + return this.errMsg; + } + if (this.causeThrowable != null) { + return this.causeThrowable.getMessage(); + } + return null; + } + + public void setErrCode(final int errCode) { + this.errCode = errCode; + } + + public void setErrMsg(final String errMsg) { + this.errMsg = errMsg; + } + + public void setCauseThrowable(final Throwable throwable) { + this.causeThrowable = this.getCauseThrowable(throwable); + } + + private Throwable getCauseThrowable(final Throwable t) { + if (t.getCause() == null) { + return t; + } + return this.getCauseThrowable(t.getCause()); + } + + @Override + public String toString() { + return "ErrCode:" + getErrCode() + ", ErrMsg:" + getErrMsg(); + } + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/AbstractMqttProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/AbstractMqttProcessor.java new file mode 100644 index 0000000000..811cf224cd --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/AbstractMqttProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.acl.Acl; +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration; +import org.apache.eventmesh.runtime.core.protocol.mqtt.client.ClientManager; + +import java.util.concurrent.Executor; + +public abstract class AbstractMqttProcessor implements MqttProcessor { + + protected final EventMeshMQTTServer eventMeshMQTTServer; + + private Executor executor; + + protected EventMeshMQTTConfiguration configuration; + + protected Acl acl; + + ClientManager clientManager = ClientManager.getInstance(); + + public AbstractMqttProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + this.eventMeshMQTTServer = eventMeshMQTTServer; + this.configuration = (EventMeshMQTTConfiguration) eventMeshMQTTServer.getConfiguration(); + this.acl = eventMeshMQTTServer.getAcl(); + } + + public AbstractMqttProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + this(eventMeshMQTTServer); + this.executor = executor; + } + + public EventMeshMQTTServer getEventMeshMQTTServer() { + return eventMeshMQTTServer; + } + + @Override + public Executor executor() { + return this.executor; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/ClientConnectProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/ClientConnectProcessor.java new file mode 100644 index 0000000000..9ec7adcccf --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/ClientConnectProcessor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.core.protocol.mqtt.client.ClientManager.ClientInfo; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.concurrent.Executor; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageFactory; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.CharsetUtil; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ClientConnectProcessor extends AbstractMqttProcessor { + + private boolean passwordMust = false; + + public ClientConnectProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + super(eventMeshMQTTServer); + } + + public ClientConnectProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + super(eventMeshMQTTServer, executor); + this.passwordMust = configuration.isMqttPasswordMust(); + } + + + @Override + public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; + if (passwordMust) { + String username = mqttConnectMessage.payload().userName(); + String password = mqttConnectMessage.payload().passwordInBytes() == null ? null + : new String(mqttConnectMessage.payload().passwordInBytes(), CharsetUtil.UTF_8); + if (!acl.checkValid(username, password)) { + MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( + new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), + new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null); + ctx.writeAndFlush(connAckMessage); + ctx.close(); + return; + } + } + ClientInfo clientInfo = clientManager.getOrRegisterClient(ctx, mqttMessage); + if (clientInfo.getKeepLiveTime() > 0) { + ctx.pipeline().addFirst("idle", new IdleStateHandler(0, 0, clientInfo.getKeepLiveTime() * 2)); + } + MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); + MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); + MqttConnAckVariableHeader + mqttConnAckVariableHeaderBack = + new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession()); + MqttFixedHeader mqttFixedHeaderBack = + new MqttFixedHeader(MqttMessageType.CONNACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); + MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); + log.info("client connected {}", connAck); + ctx.writeAndFlush(connAck); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/ClientDisConnectProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/ClientDisConnectProcessor.java new file mode 100644 index 0000000000..fdd018d600 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/ClientDisConnectProcessor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.concurrent.Executor; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; + +public class ClientDisConnectProcessor extends AbstractMqttProcessor { + + public ClientDisConnectProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + super(eventMeshMQTTServer); + } + + public ClientDisConnectProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + super(eventMeshMQTTServer, executor); + } + + + @Override + public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + clientManager.unregisterClient(ctx, mqttMessage); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/HealthCheckProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/HealthCheckProcessor.java new file mode 100644 index 0000000000..347911ba5c --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/HealthCheckProcessor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.concurrent.Executor; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HealthCheckProcessor extends AbstractMqttProcessor { + + public HealthCheckProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + super(eventMeshMQTTServer); + } + + public HealthCheckProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + super(eventMeshMQTTServer, executor); + } + + + @Override + public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + clientManager.getOrRegisterClient(ctx, mqttMessage); + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessage mqttMessageBack = new MqttMessage(fixedHeader); + log.info("health check send back {}", mqttMessageBack); + ctx.writeAndFlush(mqttMessageBack); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/MqttProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/MqttProcessor.java new file mode 100644 index 0000000000..9bff2e98e5 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/MqttProcessor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + + +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.concurrent.Executor; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; + +/** + * MqttProcessor + */ +public interface MqttProcessor { + + void process(final ChannelHandlerContext ctx, final MqttMessage mqttMessage) throws MqttException; + + /** + * @return {@link Executor} + */ + default Executor executor() { + return null; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java new file mode 100644 index 0000000000..8178cad8fd --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.core.protocol.mqtt.client.ClientManager.ClientInfo; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.Set; +import java.util.concurrent.Executor; + +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageFactory; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class PublishProcessor extends AbstractMqttProcessor { + + public PublishProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + super(eventMeshMQTTServer); + } + + public PublishProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + super(eventMeshMQTTServer, executor); + } + + + @Override + public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + clientManager.getOrRegisterClient(ctx, mqttMessage); + MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; + MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); + MqttPublishVariableHeader mqttPublishVariableHeader = mqttPublishMessage.variableHeader(); + String topic = mqttPublishVariableHeader.topicName(); + MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel(); + byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()]; + mqttPublishMessage.payload().readBytes(headBytes); + switch (qos) { + case AT_MOST_ONCE: + this.sendPublishMessage(topic, qos, headBytes, false, false); + break; + case AT_LEAST_ONCE: + //to do + break; + case EXACTLY_ONCE: + //to do + break; + default: + break; + + } + } + + private void sendPublishMessage(String topic, MqttQoS respQoS, byte[] messageBytes, boolean retain, boolean dup) { + MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( + new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), + new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(messageBytes)); + Set clientInfoSet = clientManager.search(topic); + for (ClientInfo clientInfo : clientInfoSet) { + Channel channel = clientInfo.getChannel(); + if (channel != null) { + channel.writeAndFlush(publishMessage); + } + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/SubscrubeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/SubscrubeProcessor.java new file mode 100644 index 0000000000..e1a62b41e3 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/SubscrubeProcessor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.core.protocol.mqtt.client.ClientManager.ClientInfo; +import org.apache.eventmesh.runtime.core.protocol.mqtt.client.ClientManager.TopicAndQos; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; +import io.netty.handler.codec.mqtt.MqttSubAckPayload; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SubscrubeProcessor extends AbstractMqttProcessor { + + public SubscrubeProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + super(eventMeshMQTTServer); + } + + public SubscrubeProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + super(eventMeshMQTTServer, executor); + } + + + @Override + public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + ClientInfo clientInfo = clientManager.getOrRegisterClient(ctx, mqttMessage); + MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; + MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); + List mqttTopicSubscriptions = mqttSubscribeMessage.payload().topicSubscriptions(); + if (validTopicFilter(mqttTopicSubscriptions)) { + Set + topics = + mqttTopicSubscriptions.stream().map( + mqttTopicSubscription -> new TopicAndQos(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService().value())) + .collect( + Collectors.toSet()); + log.info("client subscribe {}", topics.toString()); + List grantedQoSLevels = new ArrayList<>(topics.size()); + clientInfo.subscribes(topics); + for (int i = 0; i < topics.size(); i++) { + grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value()); + } + MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels); + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2 + topics.size()); + MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); + MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack); + log.info("subscribe send back {}", subAck); + ctx.writeAndFlush(subAck); + } else { + log.info("topic valid faild {}", mqttTopicSubscriptions); + ctx.close(); + } + + + } + + private boolean validTopicFilter(List topicSubscriptions) { + for (MqttTopicSubscription topicSubscription : topicSubscriptions) { + String topicName = topicSubscription.topicName(); + if (topicName.startsWith("+") || topicName.endsWith("/")) { + return false; + } + if (topicName.contains("#")) { + if (count(topicName, "#") > 1) { + return false; + } + } + if (topicName.contains("+")) { + if (count(topicName, "+") != count(topicName, "/+")) { + return false; + } + } + } + return true; + } + + private int count(String srcText, String findText) { + int count = 0; + Pattern p = Pattern.compile(findText); + Matcher m = p.matcher(srcText); + while (m.find()) { + count++; + } + return count; + } + + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/UnSubscrubeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/UnSubscrubeProcessor.java new file mode 100644 index 0000000000..cef432f47d --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/UnSubscrubeProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.mqtt.processor; + +import org.apache.eventmesh.runtime.boot.EventMeshMQTTServer; +import org.apache.eventmesh.runtime.core.protocol.mqtt.client.ClientManager.ClientInfo; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; + +import java.util.List; +import java.util.concurrent.Executor; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class UnSubscrubeProcessor extends AbstractMqttProcessor { + + public UnSubscrubeProcessor(EventMeshMQTTServer eventMeshMQTTServer) { + super(eventMeshMQTTServer); + } + + public UnSubscrubeProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor executor) { + super(eventMeshMQTTServer, executor); + } + + + @Override + public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { + MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage; + ClientInfo clientInfo = clientManager.getOrRegisterClient(ctx, mqttMessage); + List topics = mqttUnsubscribeMessage.payload().topics(); + clientInfo.unsubscribes(topics); + MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); + MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); + MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack, variableHeaderBack); + log.info("unsubscribe send back {}", unSubAck); + ctx.writeAndFlush(unSubAck); + } +} diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java index 3217aaccb8..d4b1162542 100644 --- a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java +++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java @@ -41,4 +41,7 @@ public interface AclService { void doAclCheckInReceive(AclProperties aclProperties) throws AclException; + default boolean doAclCheckUserNameAndPassword(String username, String password) throws AclException { + return true; + } }