diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index f4de08bc261..7941cbbf60a 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch. ### optimize: - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version +- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index f9a0eab0c7c..25ff7a0587c 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -12,7 +12,7 @@ ### optimize: - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化 - +- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖 ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应 diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2b7b35aca00..2901eb8d3f5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -409,10 +409,13 @@ class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(msg instanceof RpcMessage)) { - return; + RpcMessage rpcMessage = null; + if (msg instanceof ProtocolRpcMessage) { + rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + processMessage(ctx, rpcMessage); + } else { + LOGGER.error("rpcMessage type error"); } - processMessage(ctx, (RpcMessage) msg); } @Override diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 09696286a69..72324c26893 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -163,10 +163,13 @@ class ServerHandler extends ChannelDuplexHandler { */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(msg instanceof RpcMessage)) { - return; + RpcMessage rpcMessage = null; + if (msg instanceof ProtocolRpcMessage) { + rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + processMessage(ctx, rpcMessage); + } else { + LOGGER.error("rpcMessage type error"); } - processMessage(ctx, (RpcMessage) msg); } @Override diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java new file mode 100644 index 00000000000..ba89c508a8c --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import org.apache.seata.core.protocol.RpcMessage; + +/** + * The protocol RPC message. + */ +public interface ProtocolRpcMessage { + + /** + * The protocol message to rpc message. + * @return + */ + RpcMessage protocolMsg2RpcMsg(); + + /** + * The rpc message to protocol message. + * @param rpcMessage + */ + void rpcMsg2ProtocolMsg(RpcMessage rpcMessage); +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java new file mode 100644 index 00000000000..cea2d7e6f71 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.v0; + +import org.apache.seata.core.compressor.CompressorType; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; +import org.apache.seata.core.serializer.SerializerType; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * the protocol v0 rpc message + **/ +public class ProtocolRpcMessageV0 implements ProtocolRpcMessage { + + private static AtomicLong NEXT_ID = new AtomicLong(0); + + /** + * Gets next message id. + * + * @return the next message id + */ + public static long getNextMessageId() { + return NEXT_ID.incrementAndGet(); + } + + private long id; + private boolean isAsync; + private boolean isRequest; + private boolean isHeartbeat; + private Object body; + private byte messageType; + private boolean isSeataCodec; + + /** + * Gets id. + * + * @return the id + */ + public long getId() { + return id; + } + + /** + * Sets id. + * + * @param id the id + */ + public void setId(long id) { + this.id = id; + } + + /** + * Is async boolean. + * + * @return the boolean + */ + public boolean isAsync() { + return isAsync; + } + + /** + * Sets async. + * + * @param async the async + */ + public void setAsync(boolean async) { + isAsync = async; + } + + /** + * Is request boolean. + * + * @return the boolean + */ + public boolean isRequest() { + return isRequest; + } + + /** + * Sets request. + * + * @param request the request + */ + public void setRequest(boolean request) { + isRequest = request; + } + + /** + * Is heartbeat boolean. + * + * @return the boolean + */ + public boolean isHeartbeat() { + return isHeartbeat; + } + + /** + * Sets heartbeat. + * + * @param heartbeat the heartbeat + */ + public void setHeartbeat(boolean heartbeat) { + isHeartbeat = heartbeat; + } + + /** + * Gets body. + * + * @return the body + */ + public Object getBody() { + return body; + } + + /** + * Sets body. + * + * @param body the body + */ + public void setBody(Object body) { + this.body = body; + } + + public boolean isSeataCodec() { + return isSeataCodec; + } + + public void setSeataCodec(boolean seataCodec) { + isSeataCodec = seataCodec; + } + + public byte getMessageType() { + return messageType; + } + + public void setMessageType(byte messageType) { + this.messageType = messageType; + } + + @Override + public RpcMessage protocolMsg2RpcMsg() { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setMessageType(this.messageType); + rpcMessage.setCompressor(CompressorType.NONE.getCode()); + + byte codecType = this.isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode(); + rpcMessage.setCodec(codecType); + + if (this.isHeartbeat) { + if (this.isRequest) { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST); + } else { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE); + } + } else { + if (this.isRequest) { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + } else { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESPONSE); + } + } + rpcMessage.setBody(this.body); + rpcMessage.setId((int) this.id); + return rpcMessage; + } + + @Override + public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { + this.body = rpcMessage.getBody(); + this.id = rpcMessage.getId(); + this.isRequest = isRequest(rpcMessage.getMessageType()); + this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType()); + this.isSeataCodec = rpcMessage.getCodec() == SerializerType.SEATA.getCode(); + this.messageType = rpcMessage.getMessageType(); + } + + private boolean isHeartbeat(byte msgType) { + return msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST + || msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE; + } + + private boolean isRequest(byte msgType) { + return msgType == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY + || msgType == ProtocolConstants.MSGTYPE_RESQUEST_SYNC; + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java new file mode 100644 index 00000000000..10668cbdc65 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.v1; + +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; + +import java.util.HashMap; +import java.util.Map; + +/** + * protocol v1 rpc message + **/ +public class ProtocolRpcMessageV1 implements ProtocolRpcMessage { + private int id; + private byte messageType; + private byte codec; + private byte compressor; + private Map headMap = new HashMap<>(); + private Object body; + + /** + * Gets id. + * + * @return the id + */ + public int getId() { + return id; + } + + /** + * Sets id. + * + * @param id the id + */ + public void setId(int id) { + this.id = id; + } + + /** + * Gets body. + * + * @return the body + */ + public Object getBody() { + return body; + } + + /** + * Sets body. + * + * @param body the body + */ + public void setBody(Object body) { + this.body = body; + } + + /** + * Gets codec. + * + * @return the codec + */ + public byte getCodec() { + return codec; + } + + /** + * Sets codec. + * + * @param codec the codec + * @return the codec + */ + public void setCodec(byte codec) { + this.codec = codec; + } + + /** + * Gets compressor. + * + * @return the compressor + */ + public byte getCompressor() { + return compressor; + } + + /** + * Sets compressor. + * + * @param compressor the compressor + * @return the compressor + */ + public void setCompressor(byte compressor) { + this.compressor = compressor; + } + + /** + * Gets head map. + * + * @return the head map + */ + public Map getHeadMap() { + return headMap; + } + + /** + * Sets head map. + * + * @param headMap the head map + * @return the head map + */ + public void setHeadMap(Map headMap) { + this.headMap = headMap; + } + + /** + * Gets head. + * + * @param headKey the head key + * @return the head + */ + public String getHead(String headKey) { + return headMap.get(headKey); + } + + /** + * Put head. + * + * @param headKey the head key + * @param headValue the head value + */ + public void putHead(String headKey, String headValue) { + headMap.put(headKey, headValue); + } + + /** + * Gets message type. + * + * @return the message type + */ + public byte getMessageType() { + return messageType; + } + + /** + * Sets message type. + * + * @param messageType the message type + */ + public void setMessageType(byte messageType) { + this.messageType = messageType; + } + + @Override + public String toString() { + return StringUtils.toString(this); + } + + @Override + public RpcMessage protocolMsg2RpcMsg() { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setId(this.id); + rpcMessage.setMessageType(this.messageType); + rpcMessage.setCodec(this.codec); + rpcMessage.setCompressor(this.compressor); + rpcMessage.setHeadMap(this.headMap); + rpcMessage.setBody(this.body); + return rpcMessage; + } + + + @Override + public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { + this.body = rpcMessage.getBody(); + this.headMap = rpcMessage.getHeadMap(); + this.id = rpcMessage.getId(); + this.messageType = rpcMessage.getMessageType(); + this.codec = rpcMessage.getCodec(); + this.compressor = rpcMessage.getCompressor(); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index eb65fd20cf9..26ef52ffdca 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -27,7 +27,6 @@ import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -125,7 +124,7 @@ public Object decodeFrame(ByteBuf frame) { byte compressorType = frame.readByte(); int requestId = frame.readInt(); - RpcMessage rpcMessage = new RpcMessage(); + ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); rpcMessage.setCodec(codecType); rpcMessage.setId(requestId); rpcMessage.setCompressor(compressorType); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java index 7c32e98f2d3..575992fd876 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java @@ -66,8 +66,10 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { try { if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; + RpcMessage rpcMsg = (RpcMessage) msg; + ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); + rpcMessage.rpcMsg2ProtocolMsg(rpcMsg); int fullLength = ProtocolConstants.V1_HEAD_LENGTH; int headLength = ProtocolConstants.V1_HEAD_LENGTH; diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java index 98b9bd7964f..e35c124e306 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.DefaultPromise; import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,8 @@ public void channelInactive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; + if (msg instanceof ProtocolRpcMessage) { + RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); int msgId = rpcMessage.getId(); DefaultPromise future = (DefaultPromise) client.futureMap.remove(msgId); if (future != null) { @@ -58,7 +59,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } else { LOGGER.warn("miss msg id:{}", msgId); } + }else { + LOGGER.warn("msg is not ProtocolRpcMessage"); } + } @Override diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java index 31025304a12..5ee0df7dee2 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java @@ -27,8 +27,8 @@ import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.model.BranchType; -import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -80,7 +80,7 @@ public void testAll() { while (tag.getAndIncrement() < runTimes) { try { Future future = client.sendRpc(head, body); - RpcMessage resp = (RpcMessage) future.get(10, TimeUnit.SECONDS); + ProtocolRpcMessage resp = (ProtocolRpcMessage) future.get(10, TimeUnit.SECONDS); if (resp != null) { success.incrementAndGet(); } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java index 58745a41ada..8b468d0e8ff 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +40,12 @@ public class ServerChannelHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); - if (msg instanceof RpcMessage) { - ((RpcMessage) msg).setMessageType(ProtocolConstants.MSGTYPE_RESPONSE); + if (msg instanceof ProtocolRpcMessage) { + RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + channel.writeAndFlush(rpcMessage); + } else { + LOGGER.error("rpcMessage type error"); } - - channel.writeAndFlush(msg); } @Override