Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize : Eliminate RpcMessage and Encoder/Decoder dependencies #6209

Open
wants to merge 27 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f1bacdd
Unwinding RpcMessage and Encoder/Decoder dependencies
Bughue Dec 25, 2023
ee3c3aa
license
Bughue Dec 26, 2023
94cb0cf
v0
Bughue Dec 28, 2023
289ed97
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Dec 29, 2023
8c197af
v0
Bughue Dec 29, 2023
7d1a6b3
test
Bughue Jan 2, 2024
33e6e4e
test
Bughue Jan 2, 2024
379ba28
test
Bughue Jan 2, 2024
a9b4d5d
test
Bughue Jan 3, 2024
95573d3
fix test
Bughue Jan 4, 2024
8da65ff
Merge branch '2.x' into dev-mlv-rpc-msg
xingfudeshi Jan 18, 2024
91c171d
style
Bughue Jan 22, 2024
72d47a1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Jan 22, 2024
c1c7f3f
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Feb 4, 2024
cf4fe41
resolve conflict
Bughue Feb 4, 2024
9607bcd
resolve conflict
Bughue Feb 4, 2024
5b433cd
import
Bughue Feb 7, 2024
91418a1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 4, 2024
00a4e8e
style
Bughue Mar 4, 2024
c728968
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 4, 2024
df9ea50
optimize version
Bughue Mar 7, 2024
3841379
optimize version
Bughue Mar 8, 2024
d876d10
optimize version
Bughue Mar 8, 2024
3f0abf0
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 8, 2024
c0dfb51
optimize version
Bughue Mar 8, 2024
8e70690
optimize version
Bughue Mar 8, 2024
fa23e23
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/io/seata/core/protocol/RpcMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class RpcMessage implements Serializable {
private Map<String, String> headMap = new HashMap<>();
private Object body;

private String version;

/**
* Gets id.
*
Expand Down Expand Up @@ -169,6 +171,14 @@ public void setMessageType(byte messageType) {
this.messageType = messageType;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
Bughue marked this conversation as resolved.
Show resolved Hide resolved
this.version = version;
}

@Override
public String toString() {
return StringUtils.toString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,13 @@ class ClientHandler extends ChannelDuplexHandler {

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,13 @@ class ServerHandler extends ChannelDuplexHandler {
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/io/seata/core/rpc/netty/ProtocolRpcMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc.netty;

import io.seata.core.protocol.AbstractIdentifyRequest;
import io.seata.core.protocol.RpcMessage;

/**
* The protocol RPC message.
*/
public interface ProtocolRpcMessage {
RpcMessage protocolMsg2RpcMsg();

void rpcMsg2ProtocolMsg(RpcMessage rpcMessage);

static String getVersion(Object body) {
if (body instanceof AbstractIdentifyRequest) {
return ((AbstractIdentifyRequest) body).getVersion();
} else {
return null;
}
}

static void setVersion(Object body, String version) {
if (body instanceof AbstractIdentifyRequest) {
((AbstractIdentifyRequest) body).setVersion(version);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc.netty.v1;

import io.seata.common.util.StringUtils;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.netty.ProtocolRpcMessage;

import java.util.HashMap;
import java.util.Map;

/**
* protocol v1 rpc message
**/
public class ProtocolRpcMessageV1 implements ProtocolRpcMessage {
private int id;
private byte messageType;
private byte codec;
private byte compressor;
private Map<String, String> headMap = new HashMap<>();
private Object body;

/**
* Gets id.
*
* @return the id
*/
public int getId() {
return id;
}

/**
* Sets id.
*
* @param id the id
*/
public void setId(int id) {
this.id = id;
}

/**
* Gets body.
*
* @return the body
*/
public Object getBody() {
return body;
}

/**
* Sets body.
*
* @param body the body
*/
public void setBody(Object body) {
this.body = body;
}

/**
* Gets codec.
*
* @return the codec
*/
public byte getCodec() {
return codec;
}

/**
* Sets codec.
*
* @param codec the codec
* @return the codec
*/
public void setCodec(byte codec) {
this.codec = codec;
}

/**
* Gets compressor.
*
* @return the compressor
*/
public byte getCompressor() {
return compressor;
}

/**
* Sets compressor.
*
* @param compressor the compressor
* @return the compressor
*/
public void setCompressor(byte compressor) {
this.compressor = compressor;
}

/**
* Gets head map.
*
* @return the head map
*/
public Map<String, String> getHeadMap() {
return headMap;
}

/**
* Sets head map.
*
* @param headMap the head map
* @return the head map
*/
public void setHeadMap(Map<String, String> headMap) {
this.headMap = headMap;
}

/**
* Gets head.
*
* @param headKey the head key
* @return the head
*/
public String getHead(String headKey) {
return headMap.get(headKey);
}

/**
* Put head.
*
* @param headKey the head key
* @param headValue the head value
*/
public void putHead(String headKey, String headValue) {
headMap.put(headKey, headValue);
}

/**
* Gets message type.
*
* @return the message type
*/
public byte getMessageType() {
return messageType;
}

/**
* Sets message type.
*
* @param messageType the message type
*/
public void setMessageType(byte messageType) {
this.messageType = messageType;
}

@Override
public String toString() {
return StringUtils.toString(this);
}

@Override
public RpcMessage protocolMsg2RpcMsg() {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setId(this.id);
rpcMessage.setMessageType(this.messageType);
rpcMessage.setCodec(this.codec);
rpcMessage.setCompressor(this.compressor);
rpcMessage.setHeadMap(this.headMap);
rpcMessage.setBody(this.body);
rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body));
return rpcMessage;
}


@Override
public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
this.body = rpcMessage.getBody();
ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion());
this.headMap = rpcMessage.getHeadMap();
this.id = rpcMessage.getId();
this.messageType = rpcMessage.getMessageType();
this.codec = rpcMessage.getCodec();
this.compressor = rpcMessage.getCompressor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.seata.core.compressor.CompressorFactory;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.serializer.SerializerServiceLoader;
import io.seata.core.serializer.SerializerType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,7 +69,7 @@ public ProtocolV1Decoder() {

public ProtocolV1Decoder(int maxFrameLength) {
/*
int maxFrameLength,
int maxFrameLength,
int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3
int lengthFieldLength, FullLength is int(4B). so values is 4
int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7
Expand Down Expand Up @@ -117,7 +116,7 @@ public Object decodeFrame(ByteBuf frame) {
byte compressorType = frame.readByte();
int requestId = frame.readInt();

RpcMessage rpcMessage = new RpcMessage();
ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.setCodec(codecType);
rpcMessage.setId(requestId);
rpcMessage.setCompressor(compressorType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ public class ProtocolV1Encoder extends MessageToByteEncoder {
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
try {
if (msg instanceof RpcMessage) {
RpcMessage rpcMessage = (RpcMessage) msg;
RpcMessage rpcMsg = (RpcMessage) msg;

ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.rpcMsg2ProtocolMsg(rpcMsg);
int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
int headLength = ProtocolConstants.V1_HEAD_LENGTH;

Expand Down