Skip to content

Commit

Permalink
Fix some small issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 28, 2021
1 parent f7d0fbb commit e9e3c66
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.eventmesh.common.protocol.ProtocolTransportObject;

import lombok.Data;

@Data
public class Package implements ProtocolTransportObject {

private Header header;
Expand All @@ -35,28 +38,4 @@ public Package(Header header, Object body) {
this.header = header;
this.body = body;
}

public Header getHeader() {
return header;
}

public void setHeader(Header header) {
this.header = header;
}

public Object getBody() {
return body;
}

public void setBody(Object body) {
this.body = body;
}

@Override
public String toString() {
return "Package{" +
"header=" + header +
", body=" + body +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.meshmessage;

import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;

import org.junit.Assert;
import org.junit.Test;

public class MeshMessageProtocolAdaptorTest {

@Test
public void loadPlugin() {
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor =
ProtocolPluginFactory.getProtocolAdaptor(MeshMessageProtocolConstant.PROTOCOL_NAME);
Assert.assertNotNull(protocolAdaptor);

Assert.assertEquals(
MeshMessageProtocolConstant.PROTOCOL_NAME, protocolAdaptor.getProtocolType()
);
}

@Test
public void getProtocolType() {
}
}
4 changes: 4 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ dependencies {
implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")

implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
// for debug only, can be removed
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-cloudevents")
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-meshmessage")
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-openmessage")

testImplementation "org.mockito:mockito-core"
testImplementation "org.powermock:powermock-module-junit4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

package org.apache.eventmesh.runtime.admin.handler;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -35,6 +31,12 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

public class RejectAllClientHandler implements HttpHandler {

private static final Logger logger = LoggerFactory.getLogger(RejectAllClientHandler.class);
Expand Down Expand Up @@ -63,7 +65,8 @@ public void handle(HttpExchange httpExchange) throws IOException {
logger.info("rejectAllClient in admin====================");
if (!sessionMap.isEmpty()) {
for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(
eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
if (addr != null) {
successRemoteAddrs.add(addr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@
import static org.apache.eventmesh.common.protocol.tcp.Command.REDIRECT_TO_CLIENT;
import static org.apache.eventmesh.common.protocol.tcp.Command.SERVER_GOODBYE_REQUEST;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -38,19 +31,29 @@
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;

public class EventMeshTcp2Client {

private static final Logger logger = LoggerFactory.getLogger(EventMeshTcp2Client.class);

public static InetSocketAddress serverGoodby2Client(EventMeshTCPServer eventMeshTCPServer,Session session, ClientSessionGroupMapping mapping) {
public static InetSocketAddress serverGoodby2Client(EventMeshTCPServer eventMeshTCPServer, Session session,
ClientSessionGroupMapping mapping) {
logger.info("serverGoodby2Client client[{}]", session.getClient());
try {
long startTime = System.currentTimeMillis();
Package msg = new Package();
msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, OPStatus.SUCCESS.getCode(), "graceful normal quit from eventmesh",
msg.setHeader(
new Header(SERVER_GOODBYE_REQUEST, OPStatus.SUCCESS.getCode(), "graceful normal quit from eventmesh",
null));

eventMeshTCPServer.getScheduler().submit(new Runnable() {
Expand All @@ -62,7 +65,7 @@ public void run() {
});
InetSocketAddress address = (InetSocketAddress) session.getContext().channel().remoteAddress();

closeSessionIfTimeout(eventMeshTCPServer,session, mapping);
closeSessionIfTimeout(eventMeshTCPServer, session, mapping);
return address;
} catch (Exception e) {
logger.error("exception occur while serverGoodby2Client", e);
Expand All @@ -87,7 +90,7 @@ public void run() {
}
}, 1 * 1000, TimeUnit.MILLISECONDS);

closeSessionIfTimeout(eventMeshTCPServer,session, mapping);
closeSessionIfTimeout(eventMeshTCPServer, session, mapping);

return session.getRemoteAddress();
} catch (Exception e) {
Expand All @@ -101,33 +104,33 @@ public static void goodBye2Client(ChannelHandlerContext ctx,
ClientSessionGroupMapping mapping,
EventMeshTcpMonitor eventMeshTcpMonitor) {
long startTime = System.currentTimeMillis();
Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(),
errMsg, null));
Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(), errMsg, null));
eventMeshTcpMonitor.getEventMesh2clientMsgNum().incrementAndGet();
logger.info("goodBye2Client client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
ctx.writeAndFlush(pkg).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
try {
mapping.closeSession(ctx);
} catch (Exception e) {
logger.warn("close session failed!", e);
}
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
try {
mapping.closeSession(ctx);
} catch (Exception e) {
logger.warn("close session failed!", e);
}
}
}
);
}

public static String redirectClient2NewEventMesh(EventMeshTCPServer eventMeshTCPServer,String newEventMeshIp, int port, Session session, ClientSessionGroupMapping mapping) {
logger.info("begin to gracefully redirect Client {}, newIPPort[{}]", session.getClient(), newEventMeshIp + ":" + port);
public static String redirectClient2NewEventMesh(EventMeshTCPServer eventMeshTCPServer, String newEventMeshIp,
int port, Session session, ClientSessionGroupMapping mapping) {
logger.info("begin to gracefully redirect Client {}, newIPPort[{}]", session.getClient(),
newEventMeshIp + ":" + port);
try {
long startTime = System.currentTimeMillis();

Package pkg = new Package();
pkg.setHeader(new Header(REDIRECT_TO_CLIENT, OPStatus.SUCCESS.getCode(), null,
null));
pkg.setHeader(new Header(REDIRECT_TO_CLIENT, OPStatus.SUCCESS.getCode(), null, null));
pkg.setBody(new RedirectInfo(newEventMeshIp, port));
eventMeshTCPServer.getScheduler().schedule(new Runnable() {
@Override
Expand All @@ -136,15 +139,16 @@ public void run() {
Utils.writeAndFlush(pkg, startTime, taskExecuteTime, session.getContext(), session);
}
}, 5 * 1000, TimeUnit.MILLISECONDS);
closeSessionIfTimeout(eventMeshTCPServer,session, mapping);
closeSessionIfTimeout(eventMeshTCPServer, session, mapping);
return session.getRemoteAddress() + "--->" + newEventMeshIp + ":" + port;
} catch (Exception e) {
logger.error("exception occur while redirectClient2NewEventMesh", e);
return null;
}
}

public static void closeSessionIfTimeout(EventMeshTCPServer eventMeshTCPServer,Session session, ClientSessionGroupMapping mapping) {
public static void closeSessionIfTimeout(EventMeshTCPServer eventMeshTCPServer, Session session,
ClientSessionGroupMapping mapping) {
eventMeshTCPServer.getScheduler().schedule(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public class EventMeshCommon {

public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_";

// protocol type
public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";

public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage";
public static String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage";
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.openmessaging.api.Message;

public class MessageUtils {
private static final int seqLength = 10;
Expand Down Expand Up @@ -93,9 +94,13 @@ public static Package buildPackage(Object message, Command command) {
} else if (message instanceof EventMeshMessage) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, SpecVersion.V1.toString());
} else if (message instanceof Message) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.OPEN_MESSAGE_PROTOCOL_NAME);
// todo: this version need to be confirmed.
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, SpecVersion.V1.toString());
} else {
// unsupported protocol for server
return msg;
throw new IllegalArgumentException("Unsupported message protocol");
}
msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
msg.setBody(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.PropertyConst;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
Expand Down Expand Up @@ -125,7 +124,6 @@ public void asyncRR(CloudEvent event, AsyncRRCallback callback, long timeout) th
@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException {
try {
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER);
log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
Expand All @@ -139,7 +137,6 @@ public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshExce
@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException {
try {
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER);
log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg);
Expand All @@ -156,7 +153,15 @@ public void registerBusiHandler(ReceiveMsgHook<CloudEvent> handler) throws Event

@Override
public void close() {

try {
if (task != null) {
task.cancel(false);
}
goodbye();
super.close();
} catch (Exception ex) {
log.error("Close CloudEvent TCP publish client error", ex);
}
}

// todo: move to abstract class
Expand Down
Loading

0 comments on commit e9e3c66

Please sign in to comment.