From f1bacdd20c98c7a2aabcdb9a21f465cc4795a8c8 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 25 Dec 2023 18:58:19 +0800 Subject: [PATCH 01/20] Unwinding RpcMessage and Encoder/Decoder dependencies --- .../io/seata/core/protocol/RpcMessage.java | 10 + .../netty/AbstractNettyRemotingClient.java | 9 +- .../netty/AbstractNettyRemotingServer.java | 9 +- .../core/rpc/netty/ProtocolRpcMessage.java | 43 ++++ .../rpc/netty/v1/ProtocolRpcMessageV1.java | 196 ++++++++++++++++++ .../core/rpc/netty/v1/ProtocolV1Decoder.java | 4 +- .../core/rpc/netty/v1/ProtocolV1Encoder.java | 4 +- 7 files changed, 266 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/io/seata/core/rpc/netty/ProtocolRpcMessage.java create mode 100644 core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java diff --git a/core/src/main/java/io/seata/core/protocol/RpcMessage.java b/core/src/main/java/io/seata/core/protocol/RpcMessage.java index 844ea87fc4c..f96ead91ad5 100644 --- a/core/src/main/java/io/seata/core/protocol/RpcMessage.java +++ b/core/src/main/java/io/seata/core/protocol/RpcMessage.java @@ -35,6 +35,8 @@ public class RpcMessage implements Serializable { private Map headMap = new HashMap<>(); private Object body; + private String version; + /** * Gets id. * @@ -169,6 +171,14 @@ public void setMessageType(byte messageType) { this.messageType = messageType; } + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + @Override public String toString() { return StringUtils.toString(this); diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2a9248828ca..31594159f67 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -403,10 +403,13 @@ class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(msg instanceof RpcMessage)) { - return; + RpcMessage rpcMessage = null; + if (msg instanceof ProtocolRpcMessage) { + rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + processMessage(ctx, rpcMessage); + } else { + LOGGER.error("rpcMessage type error"); } - processMessage(ctx, (RpcMessage) msg); } @Override diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java index eb9b4adcde4..97e43923bf5 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -163,10 +163,13 @@ class ServerHandler extends ChannelDuplexHandler { */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(msg instanceof RpcMessage)) { - return; + RpcMessage rpcMessage = null; + if (msg instanceof ProtocolRpcMessage) { + rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + processMessage(ctx, rpcMessage); + } else { + LOGGER.error("rpcMessage type error"); } - processMessage(ctx, (RpcMessage) msg); } @Override diff --git a/core/src/main/java/io/seata/core/rpc/netty/ProtocolRpcMessage.java b/core/src/main/java/io/seata/core/rpc/netty/ProtocolRpcMessage.java new file mode 100644 index 00000000000..b538e04e604 --- /dev/null +++ b/core/src/main/java/io/seata/core/rpc/netty/ProtocolRpcMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty; + +import io.seata.core.protocol.AbstractIdentifyRequest; +import io.seata.core.protocol.RpcMessage; + +/** + * The protocol RPC message. + */ +public interface ProtocolRpcMessage { + RpcMessage protocolMsg2RpcMsg(); + + void rpcMsg2ProtocolMsg(RpcMessage rpcMessage); + + static String getVersion(Object body) { + if (body instanceof AbstractIdentifyRequest) { + return ((AbstractIdentifyRequest) body).getVersion(); + } else { + return null; + } + } + + static void setVersion(Object body, String version) { + if (body instanceof AbstractIdentifyRequest) { + ((AbstractIdentifyRequest) body).setVersion(version); + } + } +} diff --git a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java new file mode 100644 index 00000000000..5f2639a2fe1 --- /dev/null +++ b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java @@ -0,0 +1,196 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.v1; + +import io.seata.common.util.StringUtils; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.netty.ProtocolRpcMessage; + +import java.util.HashMap; +import java.util.Map; + +/** + * protocol v1 rpc message + **/ +public class ProtocolRpcMessageV1 implements ProtocolRpcMessage { + private int id; + private byte messageType; + private byte codec; + private byte compressor; + private Map headMap = new HashMap<>(); + private Object body; + + /** + * Gets id. + * + * @return the id + */ + public int getId() { + return id; + } + + /** + * Sets id. + * + * @param id the id + */ + public void setId(int id) { + this.id = id; + } + + /** + * Gets body. + * + * @return the body + */ + public Object getBody() { + return body; + } + + /** + * Sets body. + * + * @param body the body + */ + public void setBody(Object body) { + this.body = body; + } + + /** + * Gets codec. + * + * @return the codec + */ + public byte getCodec() { + return codec; + } + + /** + * Sets codec. + * + * @param codec the codec + * @return the codec + */ + public void setCodec(byte codec) { + this.codec = codec; + } + + /** + * Gets compressor. + * + * @return the compressor + */ + public byte getCompressor() { + return compressor; + } + + /** + * Sets compressor. + * + * @param compressor the compressor + * @return the compressor + */ + public void setCompressor(byte compressor) { + this.compressor = compressor; + } + + /** + * Gets head map. + * + * @return the head map + */ + public Map getHeadMap() { + return headMap; + } + + /** + * Sets head map. + * + * @param headMap the head map + * @return the head map + */ + public void setHeadMap(Map headMap) { + this.headMap = headMap; + } + + /** + * Gets head. + * + * @param headKey the head key + * @return the head + */ + public String getHead(String headKey) { + return headMap.get(headKey); + } + + /** + * Put head. + * + * @param headKey the head key + * @param headValue the head value + */ + public void putHead(String headKey, String headValue) { + headMap.put(headKey, headValue); + } + + /** + * Gets message type. + * + * @return the message type + */ + public byte getMessageType() { + return messageType; + } + + /** + * Sets message type. + * + * @param messageType the message type + */ + public void setMessageType(byte messageType) { + this.messageType = messageType; + } + + @Override + public String toString() { + return StringUtils.toString(this); + } + + @Override + public RpcMessage protocolMsg2RpcMsg() { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setId(this.id); + rpcMessage.setMessageType(this.messageType); + rpcMessage.setCodec(this.codec); + rpcMessage.setCompressor(this.compressor); + rpcMessage.setHeadMap(this.headMap); + rpcMessage.setBody(this.body); + rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body)); + return rpcMessage; + } + + + @Override + public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { + this.body = rpcMessage.getBody(); + ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion()); + this.headMap = rpcMessage.getHeadMap(); + this.id = rpcMessage.getId(); + this.messageType = rpcMessage.getMessageType(); + this.codec = rpcMessage.getCodec(); + this.compressor = rpcMessage.getCompressor(); + } +} diff --git a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 867e27d9457..1f4a13a75da 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -70,7 +70,7 @@ public ProtocolV1Decoder() { public ProtocolV1Decoder(int maxFrameLength) { /* - int maxFrameLength, + int maxFrameLength, int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 int lengthFieldLength, FullLength is int(4B). so values is 4 int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 @@ -117,7 +117,7 @@ public Object decodeFrame(ByteBuf frame) { byte compressorType = frame.readByte(); int requestId = frame.readInt(); - RpcMessage rpcMessage = new RpcMessage(); + ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); rpcMessage.setCodec(codecType); rpcMessage.setId(requestId); rpcMessage.setCompressor(compressorType); diff --git a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Encoder.java b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Encoder.java index 0f35eff8492..1af01f711c8 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Encoder.java +++ b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Encoder.java @@ -65,8 +65,10 @@ public class ProtocolV1Encoder extends MessageToByteEncoder { public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { try { if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; + RpcMessage rpcMsg = (RpcMessage) msg; + ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); + rpcMessage.rpcMsg2ProtocolMsg(rpcMsg); int fullLength = ProtocolConstants.V1_HEAD_LENGTH; int headLength = ProtocolConstants.V1_HEAD_LENGTH; From ee3c3aa344be5ea72c064869e230fb6e7aae3207 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 26 Dec 2023 14:51:39 +0800 Subject: [PATCH 02/20] license --- .../rpc/netty/v1/ProtocolRpcMessageV1.java | 23 ++++++++++--------- .../core/rpc/netty/v1/ProtocolV1Decoder.java | 1 - 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java index 5f2639a2fe1..1a3a76e1f27 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java +++ b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.core.rpc.netty.v1; diff --git a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 1f4a13a75da..2e5453d9da9 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -25,7 +25,6 @@ import io.seata.core.compressor.CompressorFactory; import io.seata.core.protocol.HeartbeatMessage; import io.seata.core.protocol.ProtocolConstants; -import io.seata.core.protocol.RpcMessage; import io.seata.core.serializer.SerializerServiceLoader; import io.seata.core.serializer.SerializerType; import org.slf4j.Logger; From 94cb0cfd712bed15ddf87bb0687385dfb1b61d2b Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Thu, 28 Dec 2023 19:49:12 +0800 Subject: [PATCH 03/20] v0 --- .../rpc/netty/v0/ProtocolRpcMessageV0.java | 205 ++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 core/src/main/java/io/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java diff --git a/core/src/main/java/io/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java b/core/src/main/java/io/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java new file mode 100644 index 00000000000..d6a4bfa54f4 --- /dev/null +++ b/core/src/main/java/io/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.v0; + +import io.seata.core.compressor.CompressorType; +import io.seata.core.protocol.ProtocolConstants; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.netty.ProtocolRpcMessage; +import io.seata.core.serializer.SerializerType; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * the protocol v0 rpc message + **/ +public class ProtocolRpcMessageV0 implements ProtocolRpcMessage { + + private static AtomicLong NEXT_ID = new AtomicLong(0); + + /** + * Gets next message id. + * + * @return the next message id + */ + public static long getNextMessageId() { + return NEXT_ID.incrementAndGet(); + } + + private long id; + private boolean isAsync; + private boolean isRequest; + private boolean isHeartbeat; + private Object body; + private byte messageType; + private boolean isSeataCodec; + + /** + * Gets id. + * + * @return the id + */ + public long getId() { + return id; + } + + /** + * Sets id. + * + * @param id the id + */ + public void setId(long id) { + this.id = id; + } + + /** + * Is async boolean. + * + * @return the boolean + */ + public boolean isAsync() { + return isAsync; + } + + /** + * Sets async. + * + * @param async the async + */ + public void setAsync(boolean async) { + isAsync = async; + } + + /** + * Is request boolean. + * + * @return the boolean + */ + public boolean isRequest() { + return isRequest; + } + + /** + * Sets request. + * + * @param request the request + */ + public void setRequest(boolean request) { + isRequest = request; + } + + /** + * Is heartbeat boolean. + * + * @return the boolean + */ + public boolean isHeartbeat() { + return isHeartbeat; + } + + /** + * Sets heartbeat. + * + * @param heartbeat the heartbeat + */ + public void setHeartbeat(boolean heartbeat) { + isHeartbeat = heartbeat; + } + + /** + * Gets body. + * + * @return the body + */ + public Object getBody() { + return body; + } + + /** + * Sets body. + * + * @param body the body + */ + public void setBody(Object body) { + this.body = body; + } + + public boolean isSeataCodec() { + return isSeataCodec; + } + + public void setSeataCodec(boolean seataCodec) { + isSeataCodec = seataCodec; + } + + public byte getMessageType() { + return messageType; + } + + public void setMessageType(byte messageType) { + this.messageType = messageType; + } + + @Override + public RpcMessage protocolMsg2RpcMsg() { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setMessageType(this.messageType); + rpcMessage.setCompressor(CompressorType.NONE.getCode()); + + byte codecType = this.isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode(); + rpcMessage.setCodec(codecType); + + if (this.isHeartbeat) { + if (this.isRequest) { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST); + } else { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE); + } + } else { + if (this.isRequest) { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + } else { + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESPONSE); + } + } + rpcMessage.setBody(this.body); + rpcMessage.setId((int) this.id); + rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body)); + return rpcMessage; + } + + @Override + public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { + this.body = rpcMessage.getBody(); + ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion()); + this.id = rpcMessage.getId(); + this.isRequest = isRequest(rpcMessage.getMessageType()); + this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType()); + this.isSeataCodec = rpcMessage.getCodec() == SerializerType.SEATA.getCode(); + this.messageType = rpcMessage.getMessageType(); + } + + private boolean isHeartbeat(byte msgType) { + return msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST + || msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE; + } + + private boolean isRequest(byte msgType) { + return msgType == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY + || msgType == ProtocolConstants.MSGTYPE_RESQUEST_SYNC; + } +} From 8c197af15dbf1faee4f6ef09f794339878d0f02b Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 29 Dec 2023 10:52:13 +0800 Subject: [PATCH 04/20] v0 --- .../core/rpc/netty/AbstractNettyRemoting.java | 19 +++++++++++++++---- .../netty/AbstractNettyRemotingServer.java | 14 ++++++++++---- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java index cad610c6ed2..22ca93122cc 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -43,6 +43,7 @@ import io.seata.core.protocol.MessageTypeAware; import io.seata.core.protocol.ProtocolConstants; import io.seata.core.protocol.RpcMessage; +import io.seata.core.protocol.Version; import io.seata.core.rpc.Disposable; import io.seata.core.rpc.hook.RpcHook; import io.seata.core.rpc.processor.Pair; @@ -62,7 +63,7 @@ public abstract class AbstractNettyRemoting implements Disposable { * The Timer executor. */ protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, - new NamedThreadFactory("timeoutChecker", 1, true)); + new NamedThreadFactory("timeoutChecker", 1, true)); /** * The Message executor. */ @@ -112,7 +113,7 @@ public void run() { futures.remove(entry.getKey()); RpcMessage rpcMessage = future.getRequestMessage(); future.setResultMessage(new TimeoutException(String - .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString()))); + .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString()))); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody()); } @@ -199,7 +200,7 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi return result; } catch (Exception exx) { LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), - rpcMessage.getBody()); + rpcMessage.getBody()); if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { @@ -218,7 +219,7 @@ protected void sendAsync(Channel channel, RpcMessage rpcMessage) { channelWritableCheck(channel, rpcMessage.getBody()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" - + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen()); + + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen()); } doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage); @@ -231,22 +232,32 @@ protected void sendAsync(Channel channel, RpcMessage rpcMessage) { } protected RpcMessage buildRequestMessage(Object msg, byte messageType) { + return buildRequestMessage(msg, messageType, Version.getCurrent()); + } + + protected RpcMessage buildRequestMessage(Object msg, byte messageType, String version) { RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setId(getNextMessageId()); rpcMessage.setMessageType(messageType); rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC); rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR); rpcMessage.setBody(msg); + rpcMessage.setVersion(version); return rpcMessage; } protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) { + return buildResponseMessage(rpcMessage, msg, messageType, Version.getCurrent()); + } + + protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType, String version) { RpcMessage rpcMsg = new RpcMessage(); rpcMsg.setMessageType(messageType); rpcMsg.setCodec(rpcMessage.getCodec()); // same with request rpcMsg.setCompressor(rpcMessage.getCompressor()); rpcMsg.setBody(msg); rpcMsg.setId(rpcMessage.getId()); + rpcMsg.setVersion(version); return rpcMsg; } diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 97e43923bf5..a6f947a34ed 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -69,16 +69,20 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } - RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC, rpcContext.getVersion()); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } + + @Override public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException { if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC, rpcContext.getVersion()); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @@ -87,7 +91,8 @@ public void sendAsyncRequest(Channel channel, Object msg) { if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY, rpcContext.getVersion()); super.sendAsync(channel, rpcMessage); } @@ -98,9 +103,10 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg clientChannel = ChannelManager.getSameClientChannel(channel); } if (clientChannel != null) { + RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE - : ProtocolConstants.MSGTYPE_RESPONSE); + : ProtocolConstants.MSGTYPE_RESPONSE, rpcContext.getVersion()); super.sendAsync(clientChannel, rpcMsg); } else { throw new RuntimeException("channel is error."); From 7d1a6b3c85dfc3d40ee8cdb146a7a15a7821cca3 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 2 Jan 2024 11:20:29 +0800 Subject: [PATCH 05/20] test --- .../seata/core/rpc/netty/v1/ServerChannelHandler.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java index 81d9464a392..6e1f9e3861e 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.seata.core.protocol.ProtocolConstants; import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.netty.ProtocolRpcMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +40,15 @@ public class ServerChannelHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); - if (msg instanceof RpcMessage) { - ((RpcMessage) msg).setMessageType(ProtocolConstants.MSGTYPE_RESPONSE); + RpcMessage rpcMessage = null; + if (msg instanceof ProtocolRpcMessage) { + rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + } else { + LOGGER.error("rpcMessage type error"); + return; } - channel.writeAndFlush(msg); + channel.writeAndFlush(rpcMessage); } @Override From 33e6e4e31255f2f8f1629557e02f9224ddb325af Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 2 Jan 2024 11:20:49 +0800 Subject: [PATCH 06/20] test --- .../java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java index 6e1f9e3861e..692c700858c 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.seata.core.protocol.ProtocolConstants; import io.seata.core.protocol.RpcMessage; import io.seata.core.rpc.netty.ProtocolRpcMessage; import org.slf4j.Logger; From 379ba281056a9cacfddbf0bcb6ea3191af1fd88e Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 2 Jan 2024 14:48:57 +0800 Subject: [PATCH 07/20] test --- .../java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java index 692c700858c..4f2ac538e69 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -46,7 +46,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { LOGGER.error("rpcMessage type error"); return; } - channel.writeAndFlush(rpcMessage); } From a9b4d5debad0649d40468416f2ab299dd32ab3f7 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 3 Jan 2024 15:05:50 +0800 Subject: [PATCH 08/20] test --- .../io/seata/core/rpc/netty/v1/ServerChannelHandler.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java index 4f2ac538e69..173a6c68dda 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/io/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -39,14 +39,12 @@ public class ServerChannelHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); - RpcMessage rpcMessage = null; if (msg instanceof ProtocolRpcMessage) { - rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + channel.writeAndFlush(rpcMessage); } else { LOGGER.error("rpcMessage type error"); - return; } - channel.writeAndFlush(rpcMessage); } @Override From 95573d35c2cc5e8bbc90652a2ed8b7a527e30d22 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Thu, 4 Jan 2024 14:21:33 +0800 Subject: [PATCH 09/20] fix test --- .../io/seata/core/rpc/netty/v1/ClientChannelHandler.java | 8 ++++++-- .../seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/test/src/test/java/io/seata/core/rpc/netty/v1/ClientChannelHandler.java b/test/src/test/java/io/seata/core/rpc/netty/v1/ClientChannelHandler.java index 0a6a59669f4..51c2254d87a 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/v1/ClientChannelHandler.java +++ b/test/src/test/java/io/seata/core/rpc/netty/v1/ClientChannelHandler.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.DefaultPromise; import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.netty.ProtocolRpcMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,8 @@ public void channelInactive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; + if (msg instanceof ProtocolRpcMessage) { + RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); int msgId = rpcMessage.getId(); DefaultPromise future = (DefaultPromise) client.futureMap.remove(msgId); if (future != null) { @@ -58,7 +59,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } else { LOGGER.warn("miss msg id:{}", msgId); } + }else { + LOGGER.warn("msg is not ProtocolRpcMessage"); } + } @Override diff --git a/test/src/test/java/io/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java b/test/src/test/java/io/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java index 1c8a4add3fe..8310219c85a 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java +++ b/test/src/test/java/io/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java @@ -20,6 +20,7 @@ import io.seata.core.model.BranchType; import io.seata.core.protocol.RpcMessage; import io.seata.core.protocol.transaction.BranchCommitRequest; +import io.seata.core.rpc.netty.ProtocolRpcMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -80,7 +81,7 @@ public void testAll() { while (tag.getAndIncrement() < runTimes) { try { Future future = client.sendRpc(head, body); - RpcMessage resp = (RpcMessage) future.get(10, TimeUnit.SECONDS); + ProtocolRpcMessage resp = (ProtocolRpcMessage) future.get(10, TimeUnit.SECONDS); if (resp != null) { success.incrementAndGet(); } From 91c171ddc6b7286c94d111846a2913fce2aa0e0d Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 22 Jan 2024 17:54:48 +0800 Subject: [PATCH 10/20] style --- .../main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 37f0b1cb73d..bea58ac5d8e 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -29,7 +29,6 @@ import io.seata.core.exception.DecodeException; import io.seata.core.protocol.HeartbeatMessage; import io.seata.core.protocol.ProtocolConstants; -import io.seata.core.protocol.RpcMessage; import io.seata.core.serializer.Serializer; import io.seata.core.serializer.SerializerServiceLoader; import io.seata.core.serializer.SerializerType; From cf4fe416a52ffb98f4c6d6277aa2f46f4584b088 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Sun, 4 Feb 2024 18:05:55 +0800 Subject: [PATCH 11/20] resolve conflict --- .../seata/core/rpc/netty/ProtocolRpcMessage.java | 6 +++--- .../core/rpc/netty/v0/ProtocolRpcMessageV0.java | 12 ++++++------ .../core/rpc/netty/v1/ProtocolRpcMessageV1.java | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java index b538e04e604..944d87abc46 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.seata.core.rpc.netty; +package org.apache.seata.core.rpc.netty; -import io.seata.core.protocol.AbstractIdentifyRequest; -import io.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; +import org.apache.seata.core.protocol.RpcMessage; /** * The protocol RPC message. diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java index d6a4bfa54f4..087f0e099aa 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.seata.core.rpc.netty.v0; +package org.apache.seata.core.rpc.netty.v0; -import io.seata.core.compressor.CompressorType; -import io.seata.core.protocol.ProtocolConstants; -import io.seata.core.protocol.RpcMessage; -import io.seata.core.rpc.netty.ProtocolRpcMessage; -import io.seata.core.serializer.SerializerType; +import org.apache.seata.core.compressor.CompressorType; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; +import org.apache.seata.core.serializer.SerializerType; import java.util.concurrent.atomic.AtomicLong; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java index 1a3a76e1f27..04621cfcda9 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.seata.core.rpc.netty.v1; +package org.apache.seata.core.rpc.netty.v1; -import io.seata.common.util.StringUtils; -import io.seata.core.protocol.RpcMessage; -import io.seata.core.rpc.netty.ProtocolRpcMessage; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import java.util.HashMap; import java.util.Map; From 9607bcd87aef9fd2c7124b7678fdde43096d5f7e Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Sun, 4 Feb 2024 18:09:54 +0800 Subject: [PATCH 12/20] resolve conflict --- .../seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java index bbc606b35b0..5ee0df7dee2 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java @@ -27,8 +27,8 @@ import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.model.BranchType; -import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; +import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; From 5b433cd67a3f5ea0507057ddc6e1643088502a42 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 7 Feb 2024 15:35:56 +0800 Subject: [PATCH 13/20] import --- .../org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 282b6fa7b19..1d603dd4610 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -29,7 +29,6 @@ import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; From 00a4e8e9371a3606f53fd075413421297dd6168e Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 4 Mar 2024 18:57:31 +0800 Subject: [PATCH 14/20] style --- .../apache/seata/core/rpc/netty/ProtocolRpcMessage.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java index 944d87abc46..277b727c8c4 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java @@ -23,8 +23,17 @@ * The protocol RPC message. */ public interface ProtocolRpcMessage { + + /** + * The protocol message to rpc message. + * @return + */ RpcMessage protocolMsg2RpcMsg(); + /** + * The rpc message to protocol message. + * @param rpcMessage + */ void rpcMsg2ProtocolMsg(RpcMessage rpcMessage); static String getVersion(Object body) { From df9ea50e274d9572ffd164d7c42868d898914b58 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Thu, 7 Mar 2024 11:34:54 +0800 Subject: [PATCH 15/20] optimize version --- .../org/apache/seata/core/protocol/RpcMessage.java | 10 +++++----- .../seata/core/rpc/netty/AbstractNettyRemoting.java | 4 ++-- .../seata/core/rpc/netty/ProtocolRpcMessage.java | 4 ++-- .../seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java | 4 ++-- .../seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java | 4 ++-- .../rpc/processor/server/ServerOnRequestProcessor.java | 8 +++++--- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java index 0424f7ac00c..fa80f814f5d 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java @@ -35,7 +35,7 @@ public class RpcMessage implements Serializable { private Map headMap = new HashMap<>(); private Object body; - private String version; + private String sdkVersion; /** * Gets id. @@ -171,12 +171,12 @@ public void setMessageType(byte messageType) { this.messageType = messageType; } - public String getVersion() { - return version; + public String getSdkVersion() { + return sdkVersion; } - public void setVersion(String version) { - this.version = version; + public void setSdkVersion(String sdkVersion) { + this.sdkVersion = sdkVersion; } @Override diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java index 332f04b3195..4e96ce4d5ae 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -243,7 +243,7 @@ protected RpcMessage buildRequestMessage(Object msg, byte messageType, String ve rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC); rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR); rpcMessage.setBody(msg); - rpcMessage.setVersion(version); + rpcMessage.setSdkVersion(version); return rpcMessage; } @@ -258,7 +258,7 @@ protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byt rpcMsg.setCompressor(rpcMessage.getCompressor()); rpcMsg.setBody(msg); rpcMsg.setId(rpcMessage.getId()); - rpcMsg.setVersion(version); + rpcMsg.setSdkVersion(version); return rpcMsg; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java index 277b727c8c4..1211ad56f10 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java @@ -36,7 +36,7 @@ public interface ProtocolRpcMessage { */ void rpcMsg2ProtocolMsg(RpcMessage rpcMessage); - static String getVersion(Object body) { + static String getSdkVersion(Object body) { if (body instanceof AbstractIdentifyRequest) { return ((AbstractIdentifyRequest) body).getVersion(); } else { @@ -44,7 +44,7 @@ static String getVersion(Object body) { } } - static void setVersion(Object body, String version) { + static void setSdkVersion(Object body, String version) { if (body instanceof AbstractIdentifyRequest) { ((AbstractIdentifyRequest) body).setVersion(version); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java index 087f0e099aa..1ada24def10 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java @@ -178,14 +178,14 @@ public RpcMessage protocolMsg2RpcMsg() { } rpcMessage.setBody(this.body); rpcMessage.setId((int) this.id); - rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body)); + rpcMessage.setSdkVersion(ProtocolRpcMessage.getSdkVersion(this.body)); return rpcMessage; } @Override public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { this.body = rpcMessage.getBody(); - ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion()); + ProtocolRpcMessage.setSdkVersion(this.body, rpcMessage.getSdkVersion()); this.id = rpcMessage.getId(); this.isRequest = isRequest(rpcMessage.getMessageType()); this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType()); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java index 04621cfcda9..7e6668f4a01 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java @@ -179,7 +179,7 @@ public RpcMessage protocolMsg2RpcMsg() { rpcMessage.setCompressor(this.compressor); rpcMessage.setHeadMap(this.headMap); rpcMessage.setBody(this.body); - rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body)); + rpcMessage.setSdkVersion(ProtocolRpcMessage.getSdkVersion(this.body)); return rpcMessage; } @@ -187,7 +187,7 @@ public RpcMessage protocolMsg2RpcMsg() { @Override public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { this.body = rpcMessage.getBody(); - ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion()); + ProtocolRpcMessage.setSdkVersion(this.body, rpcMessage.getSdkVersion()); this.headMap = rpcMessage.getHeadMap(); this.id = rpcMessage.getId(); this.messageType = rpcMessage.getMessageType(); diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java index 5431c19e630..985d8fd598b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java @@ -266,9 +266,10 @@ public void run() { batchResultMessage.getResultMessages().add(item.getResultMessage()); batchResultMessage.getMsgIds().add(item.getMsgId()); } + RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) -> - remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo), - channel, batchResultMessage)); + remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo, rpcContext.getVersion()), + channel, batchResultMessage)); }); isResponding = false; } @@ -326,12 +327,13 @@ private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msg * @param clientRequestRpcInfo For saving client request rpc info * @return rpcMessage */ - private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo) { + private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo, String version) { RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setId(clientRequestRpcInfo.getRpcMessageId()); rpcMessage.setCodec(clientRequestRpcInfo.getCodec()); rpcMessage.setCompressor(clientRequestRpcInfo.getCompressor()); rpcMessage.setHeadMap(clientRequestRpcInfo.getHeadMap()); + rpcMessage.setSdkVersion(version); return rpcMessage; } From 3841379e2da6ad6dbdb16c30d775a4a6d3e8bf1e Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 8 Mar 2024 19:20:04 +0800 Subject: [PATCH 16/20] optimize version --- .../apache/seata/core/protocol/RpcMessage.java | 10 ++++------ .../core/rpc/netty/AbstractNettyRemoting.java | 11 ----------- .../rpc/netty/AbstractNettyRemotingServer.java | 14 ++++---------- .../seata/core/rpc/netty/ProtocolRpcMessage.java | 15 --------------- .../core/rpc/netty/v0/ProtocolRpcMessageV0.java | 2 -- .../core/rpc/netty/v1/ProtocolRpcMessageV1.java | 2 -- .../core/rpc/netty/v1/ProtocolV1Decoder.java | 1 + .../server/ServerOnRequestProcessor.java | 8 +++----- 8 files changed, 12 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java index fa80f814f5d..d96822c4bb9 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java @@ -35,7 +35,6 @@ public class RpcMessage implements Serializable { private Map headMap = new HashMap<>(); private Object body; - private String sdkVersion; /** * Gets id. @@ -172,11 +171,10 @@ public void setMessageType(byte messageType) { } public String getSdkVersion() { - return sdkVersion; - } - - public void setSdkVersion(String sdkVersion) { - this.sdkVersion = sdkVersion; + if (body instanceof AbstractIdentifyRequest) { + return ((AbstractIdentifyRequest) body).getVersion(); + } + return ""; } @Override diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java index 4e96ce4d5ae..3d09108ac43 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -47,7 +47,6 @@ import org.apache.seata.core.rpc.hook.RpcHook; import org.apache.seata.core.rpc.processor.Pair; import org.apache.seata.core.rpc.processor.RemotingProcessor; -import org.apache.seata.core.protocol.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -233,32 +232,22 @@ protected void sendAsync(Channel channel, RpcMessage rpcMessage) { } protected RpcMessage buildRequestMessage(Object msg, byte messageType) { - return buildRequestMessage(msg, messageType, Version.getCurrent()); - } - - protected RpcMessage buildRequestMessage(Object msg, byte messageType, String version) { RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setId(getNextMessageId()); rpcMessage.setMessageType(messageType); rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC); rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR); rpcMessage.setBody(msg); - rpcMessage.setSdkVersion(version); return rpcMessage; } protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) { - return buildResponseMessage(rpcMessage, msg, messageType, Version.getCurrent()); - } - - protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType, String version) { RpcMessage rpcMsg = new RpcMessage(); rpcMsg.setMessageType(messageType); rpcMsg.setCodec(rpcMessage.getCodec()); // same with request rpcMsg.setCompressor(rpcMessage.getCompressor()); rpcMsg.setBody(msg); rpcMsg.setId(rpcMessage.getId()); - rpcMsg.setSdkVersion(version); return rpcMsg; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 528397e9406..0a0cd34562a 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -69,20 +69,16 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC, rpcContext.getVersion()); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } - - @Override public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException { if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC, rpcContext.getVersion()); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @@ -91,8 +87,7 @@ public void sendAsyncRequest(Channel channel, Object msg) { if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY, rpcContext.getVersion()); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); super.sendAsync(channel, rpcMessage); } @@ -103,10 +98,9 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg clientChannel = ChannelManager.getSameClientChannel(channel); } if (clientChannel != null) { - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE - : ProtocolConstants.MSGTYPE_RESPONSE, rpcContext.getVersion()); + : ProtocolConstants.MSGTYPE_RESPONSE); super.sendAsync(clientChannel, rpcMsg); } else { throw new RuntimeException("channel is error."); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java index 1211ad56f10..ba89c508a8c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java @@ -16,7 +16,6 @@ */ package org.apache.seata.core.rpc.netty; -import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.RpcMessage; /** @@ -35,18 +34,4 @@ public interface ProtocolRpcMessage { * @param rpcMessage */ void rpcMsg2ProtocolMsg(RpcMessage rpcMessage); - - static String getSdkVersion(Object body) { - if (body instanceof AbstractIdentifyRequest) { - return ((AbstractIdentifyRequest) body).getVersion(); - } else { - return null; - } - } - - static void setSdkVersion(Object body, String version) { - if (body instanceof AbstractIdentifyRequest) { - ((AbstractIdentifyRequest) body).setVersion(version); - } - } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java index 1ada24def10..cea2d7e6f71 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java @@ -178,14 +178,12 @@ public RpcMessage protocolMsg2RpcMsg() { } rpcMessage.setBody(this.body); rpcMessage.setId((int) this.id); - rpcMessage.setSdkVersion(ProtocolRpcMessage.getSdkVersion(this.body)); return rpcMessage; } @Override public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { this.body = rpcMessage.getBody(); - ProtocolRpcMessage.setSdkVersion(this.body, rpcMessage.getSdkVersion()); this.id = rpcMessage.getId(); this.isRequest = isRequest(rpcMessage.getMessageType()); this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType()); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java index 7e6668f4a01..10668cbdc65 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java @@ -179,7 +179,6 @@ public RpcMessage protocolMsg2RpcMsg() { rpcMessage.setCompressor(this.compressor); rpcMessage.setHeadMap(this.headMap); rpcMessage.setBody(this.body); - rpcMessage.setSdkVersion(ProtocolRpcMessage.getSdkVersion(this.body)); return rpcMessage; } @@ -187,7 +186,6 @@ public RpcMessage protocolMsg2RpcMsg() { @Override public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) { this.body = rpcMessage.getBody(); - ProtocolRpcMessage.setSdkVersion(this.body, rpcMessage.getSdkVersion()); this.headMap = rpcMessage.getHeadMap(); this.id = rpcMessage.getId(); this.messageType = rpcMessage.getMessageType(); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 1d603dd4610..429e0576e3c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -27,6 +27,7 @@ import org.apache.seata.core.compressor.CompressorFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.exception.DecodeException; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.serializer.Serializer; diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java index 985d8fd598b..5431c19e630 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java @@ -266,10 +266,9 @@ public void run() { batchResultMessage.getResultMessages().add(item.getResultMessage()); batchResultMessage.getMsgIds().add(item.getMsgId()); } - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) -> - remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo, rpcContext.getVersion()), - channel, batchResultMessage)); + remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo), + channel, batchResultMessage)); }); isResponding = false; } @@ -327,13 +326,12 @@ private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msg * @param clientRequestRpcInfo For saving client request rpc info * @return rpcMessage */ - private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo, String version) { + private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo) { RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setId(clientRequestRpcInfo.getRpcMessageId()); rpcMessage.setCodec(clientRequestRpcInfo.getCodec()); rpcMessage.setCompressor(clientRequestRpcInfo.getCompressor()); rpcMessage.setHeadMap(clientRequestRpcInfo.getHeadMap()); - rpcMessage.setSdkVersion(version); return rpcMessage; } From d876d1037a2952b7f82a4cb5c4f986605cfcefeb Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 8 Mar 2024 19:22:07 +0800 Subject: [PATCH 17/20] optimize version --- .../src/main/java/org/apache/seata/core/protocol/RpcMessage.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java index d96822c4bb9..4ceabf80c2b 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java @@ -35,7 +35,6 @@ public class RpcMessage implements Serializable { private Map headMap = new HashMap<>(); private Object body; - /** * Gets id. * From c0dfb5172bc9e9e43e0257ca833cd538b192aa40 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 8 Mar 2024 19:26:59 +0800 Subject: [PATCH 18/20] optimize version --- .../java/org/apache/seata/core/protocol/RpcMessage.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java index 4ceabf80c2b..4f0963b20f7 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java @@ -169,13 +169,6 @@ public void setMessageType(byte messageType) { this.messageType = messageType; } - public String getSdkVersion() { - if (body instanceof AbstractIdentifyRequest) { - return ((AbstractIdentifyRequest) body).getVersion(); - } - return ""; - } - @Override public String toString() { return StringUtils.toString(this); From 8e706902740724e2bd24da80ec59b908ef402f8b Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 8 Mar 2024 19:53:18 +0800 Subject: [PATCH 19/20] optimize version --- .../org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 429e0576e3c..1d603dd4610 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -27,7 +27,6 @@ import org.apache.seata.core.compressor.CompressorFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.exception.DecodeException; -import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.serializer.Serializer; From ade6ff7949140fcf44e2df1b18d0667bd46e5522 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 24 Jun 2024 09:35:30 +0800 Subject: [PATCH 20/20] style --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index f4de08bc261..7941cbbf60a 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch. ### optimize: - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version +- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index f9a0eab0c7c..25ff7a0587c 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -12,7 +12,7 @@ ### optimize: - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化 - +- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖 ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应