-
Notifications
You must be signed in to change notification settings - Fork 623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature #4793] Support MQTT protocol #4794
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.runtime.boot; | ||
|
||
|
||
import org.apache.eventmesh.common.config.CommonConfiguration; | ||
import org.apache.eventmesh.runtime.acl.Acl; | ||
import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientConnectProcessor; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientDisConnectProcessor; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.HealthCheckProcessor; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.MqttProcessor; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.PublishProcessor; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.SubscrubeProcessor; | ||
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.UnSubscrubeProcessor; | ||
import org.apache.eventmesh.runtime.meta.MetaStorage; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import io.netty.bootstrap.ServerBootstrap; | ||
import io.netty.buffer.PooledByteBufAllocator; | ||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelHandler.Sharable; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelInboundHandlerAdapter; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.ChannelOption; | ||
import io.netty.channel.ChannelPipeline; | ||
import io.netty.channel.epoll.EpollServerSocketChannel; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioServerSocketChannel; | ||
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; | ||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; | ||
import io.netty.handler.codec.mqtt.MqttDecoder; | ||
import io.netty.handler.codec.mqtt.MqttEncoder; | ||
import io.netty.handler.codec.mqtt.MqttFixedHeader; | ||
import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException; | ||
import io.netty.handler.codec.mqtt.MqttMessage; | ||
import io.netty.handler.codec.mqtt.MqttMessageFactory; | ||
import io.netty.handler.codec.mqtt.MqttMessageType; | ||
import io.netty.handler.codec.mqtt.MqttQoS; | ||
import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
Check warning on line 67 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
public class EventMeshMQTTServer extends AbstractRemotingServer { | ||
|
||
private final EventMeshMQTTConfiguration eventMeshMQTTConfiguration; | ||
|
||
private final EventMeshServer eventMeshServer; | ||
|
||
private final MetaStorage metaStorage; | ||
|
||
private final Acl acl; | ||
|
||
|
||
protected final Map<MqttMessageType, MqttProcessor> processorTable = | ||
Check warning on line 79 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
new ConcurrentHashMap<>(64); | ||
|
||
private final AtomicBoolean started = new AtomicBoolean(false); | ||
Check warning on line 82 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
|
||
public EventMeshMQTTServer(final EventMeshServer eventMeshServer, final EventMeshMQTTConfiguration eventMeshMQTTConfiguration) { | ||
this.eventMeshServer = eventMeshServer; | ||
this.eventMeshMQTTConfiguration = eventMeshMQTTConfiguration; | ||
this.metaStorage = eventMeshServer.getMetaStorage(); | ||
this.acl = eventMeshServer.getAcl(); | ||
} | ||
Check warning on line 90 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
@Override | ||
public void init() throws Exception { | ||
log.info("==================EventMeshMQTTServer Initialing=================="); | ||
super.init("eventMesh-mqtt"); | ||
registerMQTTProcessor(); | ||
Check warning on line 96 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
} | ||
Check warning on line 98 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
private void registerMQTTProcessor() { | ||
processorTable.putIfAbsent(MqttMessageType.CONNECT, new ClientConnectProcessor(this, getWorkerGroup())); | ||
processorTable.putIfAbsent(MqttMessageType.DISCONNECT, new ClientDisConnectProcessor(this, getWorkerGroup())); | ||
processorTable.putIfAbsent(MqttMessageType.PINGREQ, new HealthCheckProcessor(this, getWorkerGroup())); | ||
processorTable.putIfAbsent(MqttMessageType.SUBSCRIBE, new SubscrubeProcessor(this, getWorkerGroup())); | ||
processorTable.putIfAbsent(MqttMessageType.UNSUBSCRIBE, new UnSubscrubeProcessor(this, getWorkerGroup())); | ||
processorTable.putIfAbsent(MqttMessageType.PUBLISH, new PublishProcessor(this, getWorkerGroup())); | ||
} | ||
Check warning on line 107 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
|
||
@Override | ||
public void start() throws Exception { | ||
Thread thread = new Thread(() -> { | ||
final ServerBootstrap bootstrap = new ServerBootstrap(); | ||
bootstrap.group(this.getBossGroup(), this.getIoGroup()) | ||
Check warning on line 114 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class); | ||
bootstrap.option(ChannelOption.SO_REUSEADDR, true) | ||
.option(ChannelOption.SO_BACKLOG, 1024) | ||
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) | ||
.option(ChannelOption.SO_RCVBUF, 10485760); | ||
Check warning on line 119 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
bootstrap.childOption(ChannelOption.TCP_NODELAY, true) | ||
.childOption(ChannelOption.SO_KEEPALIVE, true) | ||
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); | ||
bootstrap.childHandler(new MQTTServerInitializer()); | ||
Check warning on line 124 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
try { | ||
int port = eventMeshMQTTConfiguration.getEventMeshTcpServerPort(); | ||
ChannelFuture f = bootstrap.bind(port).sync(); | ||
log.info("EventMeshMQTTServer[port={}] started.....", port); | ||
f.channel().closeFuture().sync(); | ||
} catch (Exception e) { | ||
log.error("EventMeshMQTTServer RemotingServer Start Err!", e); | ||
Check warning on line 132 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
try { | ||
shutdown(); | ||
} catch (Exception ex) { | ||
log.error("EventMeshMQTTServer RemotingServer shutdown Err!", ex); | ||
} | ||
System.exit(-1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it appropriate to exit the process when the MQTT server fails to start? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a reference to other EM protocols.If you want to modify it after the discussion, I will ignore this startup failure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently tcp protocol, http protocol server must be started successfully, grpc protocol did not so. Now added MQTT protocol server must be started successfullyt, please community to give advice, I can not decide. 目前tcp协议、http协议的server必须成功启动,grpc协议没有如此,现在新增MQTT协议的server是否必须启动成功,请社区给出意见,我权衡不好。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我理解这里的必须启动成功实际上是受是否开启MQTT协议的配置控制的吧?这里的退出我认为没有问题,如果MQTT协议加载有问题退出了,那其实可以在配置中移除MQTT协议,保证TCP、HTTP等协议正常启动服务就好。 |
||
} | ||
}, "eventMesh-mqtt-server"); | ||
thread.start(); | ||
Check warning on line 141 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
started.compareAndSet(false, true); | ||
Check warning on line 143 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
} | ||
Check warning on line 145 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
@Override | ||
public CommonConfiguration getConfiguration() { | ||
return eventMeshMQTTConfiguration; | ||
Check warning on line 149 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
|
||
private class MQTTServerInitializer extends ChannelInitializer<SocketChannel> { | ||
Check warning on line 152 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
|
||
@Override | ||
protected void initChannel(SocketChannel ch) throws Exception { | ||
ChannelPipeline channelPipeline = ch.pipeline(); | ||
channelPipeline.addLast(getWorkerGroup(), MqttEncoder.INSTANCE); | ||
channelPipeline.addLast(getWorkerGroup(), new MqttDecoder()); | ||
channelPipeline.addLast(getWorkerGroup(), new EventMeshMqttChannelInboundHandler()); | ||
} | ||
Check warning on line 161 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
|
||
@Sharable | ||
private class EventMeshMqttChannelInboundHandler extends ChannelInboundHandlerAdapter { | ||
Check warning on line 165 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
@Override | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
if (msg instanceof MqttMessage) { | ||
MqttMessage mqttMessage = (MqttMessage) msg; | ||
Check warning on line 170 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
if (mqttMessage.decoderResult().isFailure()) { | ||
Throwable cause = mqttMessage.decoderResult().cause(); | ||
Check warning on line 172 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
if (cause instanceof MqttUnacceptableProtocolVersionException) { | ||
ctx.writeAndFlush(MqttMessageFactory.newMessage( | ||
Check warning on line 174 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), | ||
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), | ||
null)); | ||
} else if (cause instanceof MqttIdentifierRejectedException) { | ||
ctx.writeAndFlush(MqttMessageFactory.newMessage( | ||
Check warning on line 179 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), | ||
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), | ||
null)); | ||
} | ||
ctx.close(); | ||
return; | ||
Check warning on line 185 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); | ||
MqttMessageType mqttMessageType = mqttFixedHeader.messageType(); | ||
MqttProcessor mqttProcessor = processorTable.get(mqttMessageType); | ||
Check warning on line 189 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
if (!Objects.isNull(mqttProcessor)) { | ||
Executor executor = mqttProcessor.executor(); | ||
Check warning on line 191 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
if (Objects.isNull(executor)) { | ||
mqttProcessor.process(ctx, mqttMessage); | ||
Check warning on line 193 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} else { | ||
executor.execute(() -> { | ||
Check warning on line 195 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
try { | ||
mqttProcessor.process(ctx, mqttMessage); | ||
} catch (MqttException e) { | ||
log.error("[mqtt Processor error]", e); | ||
ctx.close(); | ||
} | ||
}); | ||
Check warning on line 202 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
} | ||
} | ||
} | ||
Check warning on line 206 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | ||
if (cause instanceof IOException) { | ||
ctx.close(); | ||
Check warning on line 211 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} else { | ||
super.exceptionCaught(ctx, cause); | ||
Check warning on line 213 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
} | ||
Check warning on line 215 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
|
||
public Acl getAcl() { | ||
return acl; | ||
Check warning on line 219 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.runtime.boot; | ||
|
||
import static org.apache.eventmesh.common.Constants.MQTT; | ||
|
||
import org.apache.eventmesh.common.config.ConfigService; | ||
import org.apache.eventmesh.common.utils.ConfigurationContextUtil; | ||
import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration; | ||
|
||
public class EventMeshMqttBootstrap implements EventMeshBootstrap { | ||
|
||
private EventMeshMQTTServer eventMeshMQTTServer; | ||
|
||
private EventMeshServer eventMeshServer; | ||
|
||
private EventMeshMQTTConfiguration eventMeshMQTTConfiguration; | ||
|
||
public EventMeshMqttBootstrap(EventMeshServer eventMeshServer) { | ||
this.eventMeshServer = eventMeshServer; | ||
ConfigService configService = ConfigService.getInstance(); | ||
this.eventMeshMQTTConfiguration = configService.buildConfigInstance(EventMeshMQTTConfiguration.class); | ||
ConfigurationContextUtil.putIfAbsent(MQTT, eventMeshMQTTConfiguration); | ||
} | ||
Check warning on line 39 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
|
||
@Override | ||
public void init() throws Exception { | ||
if (eventMeshMQTTConfiguration != null) { | ||
eventMeshMQTTServer = new EventMeshMQTTServer(eventMeshServer, eventMeshMQTTConfiguration); | ||
eventMeshMQTTServer.init(); | ||
Check warning on line 45 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
} | ||
|
||
} | ||
Check warning on line 48 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
|
||
@Override | ||
public void start() throws Exception { | ||
if (eventMeshMQTTServer != null) { | ||
eventMeshMQTTServer.start(); | ||
Check warning on line 53 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
} | ||
} | ||
Check warning on line 55 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
|
||
@Override | ||
public void shutdown() throws Exception { | ||
// server shutdown | ||
if (eventMeshMQTTServer != null) { | ||
eventMeshMQTTServer.shutdown(); | ||
Check warning on line 61 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
} | ||
} | ||
Check warning on line 63 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
|
||
public EventMeshMQTTServer getEventMeshMQTTServer() { | ||
return eventMeshMQTTServer; | ||
Check warning on line 66 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
} | ||
|
||
public EventMeshServer getEventMeshServer() { | ||
return eventMeshServer; | ||
Check warning on line 70 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
} | ||
|
||
public EventMeshMQTTConfiguration getEventMeshMQTTConfiguration() { | ||
return eventMeshMQTTConfiguration; | ||
Check warning on line 74 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class is not serializable, is the
transient
keyword redundant?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted,please review again.