From e9e3c665879b04be49d75622b704269b4fcb4ac2 Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Sun, 28 Nov 2021 13:03:05 +0800 Subject: [PATCH] Fix some small issue --- .../common/protocol/tcp/Package.java | 27 +-------- .../MeshMessageProtocolAdaptor.java | 1 - .../MeshMessageProtocolAdaptorTest.java | 43 +++++++++++++ eventmesh-runtime/build.gradle | 4 ++ .../admin/handler/RejectAllClientHandler.java | 13 ++-- .../tcp/client/EventMeshTcp2Client.java | 60 ++++++++++--------- .../client/tcp/common/EventMeshCommon.java | 3 +- .../client/tcp/common/MessageUtils.java | 7 ++- .../cloudevent/CloudEventTCPPubClient.java | 13 ++-- .../EventMeshMessageTCPPubClient.java | 32 +++++----- 10 files changed, 124 insertions(+), 79 deletions(-) create mode 100644 eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/test/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptorTest.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java index 82557a5710..f277d06adf 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java @@ -19,6 +19,9 @@ import org.apache.eventmesh.common.protocol.ProtocolTransportObject; +import lombok.Data; + +@Data public class Package implements ProtocolTransportObject { private Header header; @@ -35,28 +38,4 @@ public Package(Header header, Object body) { this.header = header; this.body = body; } - - public Header getHeader() { - return header; - } - - public void setHeader(Header header) { - this.header = header; - } - - public Object getBody() { - return body; - } - - public void setBody(Object body) { - this.body = body; - } - - @Override - public String toString() { - return "Package{" + - "header=" + header + - ", body=" + body + - '}'; - } } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java index 4ef4f90b48..9dac2952c6 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java @@ -24,7 +24,6 @@ import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/test/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptorTest.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/test/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptorTest.java new file mode 100644 index 0000000000..54108bf79f --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/test/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptorTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.meshmessage; + +import org.apache.eventmesh.common.protocol.ProtocolTransportObject; +import org.apache.eventmesh.protocol.api.ProtocolAdaptor; +import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; + +import org.junit.Assert; +import org.junit.Test; + +public class MeshMessageProtocolAdaptorTest { + + @Test + public void loadPlugin() { + ProtocolAdaptor protocolAdaptor = + ProtocolPluginFactory.getProtocolAdaptor(MeshMessageProtocolConstant.PROTOCOL_NAME); + Assert.assertNotNull(protocolAdaptor); + + Assert.assertEquals( + MeshMessageProtocolConstant.PROTOCOL_NAME, protocolAdaptor.getProtocolType() + ); + } + + @Test + public void getProtocolType() { + } +} \ No newline at end of file diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 2678f7ccfb..5bf0df28cb 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -37,6 +37,10 @@ dependencies { implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") + // for debug only, can be removed + implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-cloudevents") + implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-meshmessage") + implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-openmessage") testImplementation "org.mockito:mockito-core" testImplementation "org.powermock:powermock-module-junit4" diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java index 6314c489ac..7a6f598576 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java @@ -17,15 +17,11 @@ package org.apache.eventmesh.runtime.admin.handler; -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.util.NetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; @@ -35,6 +31,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + public class RejectAllClientHandler implements HttpHandler { private static final Logger logger = LoggerFactory.getLogger(RejectAllClientHandler.class); @@ -63,7 +65,8 @@ public void handle(HttpExchange httpExchange) throws IOException { logger.info("rejectAllClient in admin===================="); if (!sessionMap.isEmpty()) { for (Map.Entry entry : sessionMap.entrySet()) { - InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); + InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client( + eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); if (addr != null) { successRemoteAddrs.add(addr); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java index 17868c6e97..8c71b587d2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java @@ -20,13 +20,6 @@ import static org.apache.eventmesh.common.protocol.tcp.Command.REDIRECT_TO_CLIENT; import static org.apache.eventmesh.common.protocol.tcp.Command.SERVER_GOODBYE_REQUEST; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; - import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.OPStatus; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -38,19 +31,29 @@ import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.runtime.util.Utils; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; + public class EventMeshTcp2Client { private static final Logger logger = LoggerFactory.getLogger(EventMeshTcp2Client.class); - public static InetSocketAddress serverGoodby2Client(EventMeshTCPServer eventMeshTCPServer,Session session, ClientSessionGroupMapping mapping) { + public static InetSocketAddress serverGoodby2Client(EventMeshTCPServer eventMeshTCPServer, Session session, + ClientSessionGroupMapping mapping) { logger.info("serverGoodby2Client client[{}]", session.getClient()); try { long startTime = System.currentTimeMillis(); Package msg = new Package(); - msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, OPStatus.SUCCESS.getCode(), "graceful normal quit from eventmesh", + msg.setHeader( + new Header(SERVER_GOODBYE_REQUEST, OPStatus.SUCCESS.getCode(), "graceful normal quit from eventmesh", null)); eventMeshTCPServer.getScheduler().submit(new Runnable() { @@ -62,7 +65,7 @@ public void run() { }); InetSocketAddress address = (InetSocketAddress) session.getContext().channel().remoteAddress(); - closeSessionIfTimeout(eventMeshTCPServer,session, mapping); + closeSessionIfTimeout(eventMeshTCPServer, session, mapping); return address; } catch (Exception e) { logger.error("exception occur while serverGoodby2Client", e); @@ -87,7 +90,7 @@ public void run() { } }, 1 * 1000, TimeUnit.MILLISECONDS); - closeSessionIfTimeout(eventMeshTCPServer,session, mapping); + closeSessionIfTimeout(eventMeshTCPServer, session, mapping); return session.getRemoteAddress(); } catch (Exception e) { @@ -101,33 +104,33 @@ public static void goodBye2Client(ChannelHandlerContext ctx, ClientSessionGroupMapping mapping, EventMeshTcpMonitor eventMeshTcpMonitor) { long startTime = System.currentTimeMillis(); - Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(), - errMsg, null)); + Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(), errMsg, null)); eventMeshTcpMonitor.getEventMesh2clientMsgNum().incrementAndGet(); logger.info("goodBye2Client client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); ctx.writeAndFlush(pkg).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Utils.logSucceedMessageFlow(pkg, null, startTime, startTime); - try { - mapping.closeSession(ctx); - } catch (Exception e) { - logger.warn("close session failed!", e); - } + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Utils.logSucceedMessageFlow(pkg, null, startTime, startTime); + try { + mapping.closeSession(ctx); + } catch (Exception e) { + logger.warn("close session failed!", e); } } + } ); } - public static String redirectClient2NewEventMesh(EventMeshTCPServer eventMeshTCPServer,String newEventMeshIp, int port, Session session, ClientSessionGroupMapping mapping) { - logger.info("begin to gracefully redirect Client {}, newIPPort[{}]", session.getClient(), newEventMeshIp + ":" + port); + public static String redirectClient2NewEventMesh(EventMeshTCPServer eventMeshTCPServer, String newEventMeshIp, + int port, Session session, ClientSessionGroupMapping mapping) { + logger.info("begin to gracefully redirect Client {}, newIPPort[{}]", session.getClient(), + newEventMeshIp + ":" + port); try { long startTime = System.currentTimeMillis(); Package pkg = new Package(); - pkg.setHeader(new Header(REDIRECT_TO_CLIENT, OPStatus.SUCCESS.getCode(), null, - null)); + pkg.setHeader(new Header(REDIRECT_TO_CLIENT, OPStatus.SUCCESS.getCode(), null, null)); pkg.setBody(new RedirectInfo(newEventMeshIp, port)); eventMeshTCPServer.getScheduler().schedule(new Runnable() { @Override @@ -136,7 +139,7 @@ public void run() { Utils.writeAndFlush(pkg, startTime, taskExecuteTime, session.getContext(), session); } }, 5 * 1000, TimeUnit.MILLISECONDS); - closeSessionIfTimeout(eventMeshTCPServer,session, mapping); + closeSessionIfTimeout(eventMeshTCPServer, session, mapping); return session.getRemoteAddress() + "--->" + newEventMeshIp + ":" + port; } catch (Exception e) { logger.error("exception occur while redirectClient2NewEventMesh", e); @@ -144,7 +147,8 @@ public void run() { } } - public static void closeSessionIfTimeout(EventMeshTCPServer eventMeshTCPServer,Session session, ClientSessionGroupMapping mapping) { + public static void closeSessionIfTimeout(EventMeshTCPServer eventMeshTCPServer, Session session, + ClientSessionGroupMapping mapping) { eventMeshTCPServer.getScheduler().schedule(new Runnable() { @Override public void run() { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java index ca56422ff1..67cb316fb5 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java @@ -121,7 +121,8 @@ public class EventMeshCommon { public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_"; + // protocol type public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents"; - public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage"; + public static String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage"; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 0b96cab2b8..16cbeef753 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -34,6 +34,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.SpecVersion; +import io.openmessaging.api.Message; public class MessageUtils { private static final int seqLength = 10; @@ -93,9 +94,13 @@ public static Package buildPackage(Object message, Command command) { } else if (message instanceof EventMeshMessage) { msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME); msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, SpecVersion.V1.toString()); + } else if (message instanceof Message) { + msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.OPEN_MESSAGE_PROTOCOL_NAME); + // todo: this version need to be confirmed. + msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, SpecVersion.V1.toString()); } else { // unsupported protocol for server - return msg; + throw new IllegalArgumentException("Unsupported message protocol"); } msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp"); msg.setBody(message); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index ca5901a7ea..ca713ad3ce 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -21,7 +21,6 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.client.tcp.common.MessageUtils; -import org.apache.eventmesh.client.tcp.common.PropertyConst; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; @@ -125,7 +124,6 @@ public void asyncRR(CloudEvent event, AsyncRRCallback callback, long timeout) th @Override public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException { try { - // todo: transform EventMeshMessage to Package Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER); log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), @@ -139,7 +137,6 @@ public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshExce @Override public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException { try { - // todo: transform EventMeshMessage to Package Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER); log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg); @@ -156,7 +153,15 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws Event @Override public void close() { - + try { + if (task != null) { + task.cancel(false); + } + goodbye(); + super.close(); + } catch (Exception ex) { + log.error("Close CloudEvent TCP publish client error", ex); + } } // todo: move to abstract class diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index 18f555fcaa..37c0f159a7 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -21,7 +21,6 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.client.tcp.common.MessageUtils; -import org.apache.eventmesh.client.tcp.common.PropertyConst; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; @@ -75,18 +74,18 @@ public void init() throws EventMeshException { public void heartbeat() throws EventMeshException { // if (task != null) { // synchronized (EventMeshMessageTCPPubClient.class) { - task = scheduler.scheduleAtFixedRate(() -> { - try { - if (!isActive()) { - reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ignore) { - // ignore - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + task = scheduler.scheduleAtFixedRate(() -> { + try { + if (!isActive()) { + reconnect(); + } + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // ignore } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } // } // } @@ -113,7 +112,8 @@ public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventM } @Override - public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, long timeout) throws EventMeshException { + public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, long timeout) + throws EventMeshException { try { Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.REQUEST_TO_SERVER); super.send(msg); @@ -159,11 +159,13 @@ public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) @Override public void close() { try { - task.cancel(false); + if (task != null) { + task.cancel(false); + } goodbye(); super.close(); } catch (Exception e) { - e.printStackTrace(); + log.error("Close EventMeshMessage TCP publish client error", e); } }