Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: load SeataSerializer by version #6208

Merged
merged 31 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
39d88c1
load SeataSerializer by version
Bughue Dec 25, 2023
901b886
license
Bughue Dec 26, 2023
b518004
style
Bughue Dec 26, 2023
883adfb
test ProtocolConstants.VERSION
Bughue Dec 26, 2023
492672c
version serialize
Bughue Dec 27, 2023
d8c19a4
v0
Bughue Dec 29, 2023
958ef77
v0
Bughue Dec 29, 2023
2a0d836
v0
Bughue Dec 29, 2023
227464b
v0
Bughue Dec 29, 2023
0afa845
license
Bughue Dec 29, 2023
4959cbe
license
Bughue Dec 29, 2023
dfd9bf2
style
Bughue Dec 29, 2023
48a6090
style
Bughue Jan 2, 2024
181a421
inner class
Bughue Jan 4, 2024
d2804fb
inner class
Bughue Jan 4, 2024
342d306
single pattern
Bughue Jan 22, 2024
439d132
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jan 22, 2024
74c48d5
conflit
Bughue Jan 22, 2024
93c972b
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Feb 4, 2024
0f8c187
resolve conflict
Bughue Feb 4, 2024
704669a
style
Bughue Feb 28, 2024
9d30454
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Apr 30, 2024
2f5feb0
merge
Bughue Apr 30, 2024
89c0674
double lock check
Bughue May 7, 2024
b649852
double lock check
Bughue May 7, 2024
3cc6ca3
double lock check
Bughue May 7, 2024
2d37bf1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jun 21, 2024
9d199a1
style
Bughue Jun 21, 2024
dd9d808
style
Bughue Jun 21, 2024
5868eef
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jun 21, 2024
ac90469
style
Bughue Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/src/main/java/io/seata/core/protocol/ProtocolConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,20 @@ public interface ProtocolConstants {
*/
byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};

/**
* Old protocol version
*/
byte VERSION_0 = 0;

/**
* Protocol version
*/
byte VERSION_1 = 1;

/**
* Protocol version
*/
byte VERSION = 1;
byte VERSION = VERSION_1;

/**
* Max frame length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Object decodeFrame(ByteBuf frame) {
frame.readBytes(bs);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import io.seata.common.loader.EnhancedServiceNotFoundException;
import io.seata.common.util.ReflectionUtil;

import java.util.HashMap;
import java.util.Map;

/**
* The Service Loader for the interface {@link Serializer}
*
*/
public final class SerializerServiceLoader {

private SerializerServiceLoader() {
}

private static Map<String, Serializer> serializerMap = new HashMap<>();
Bughue marked this conversation as resolved.
Show resolved Hide resolved

private static final String PROTOBUF_SERIALIZER_CLASS_NAME = "io.seata.serializer.protobuf.ProtobufSerializer";

Expand All @@ -39,7 +42,7 @@ private SerializerServiceLoader() {
* @return the service of {@link Serializer}
* @throws EnhancedServiceNotFoundException the enhanced service not found exception
*/
public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException {
public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException {
if (type == SerializerType.PROTOBUF) {
try {
ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME);
Expand All @@ -48,6 +51,24 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun
"Please manually reference 'io.seata:seata-serializer-protobuf' dependency ", e);
}
}
return EnhancedServiceLoader.load(Serializer.class, type.name());

String key = serialzerKey(type, version);
Serializer serializer = serializerMap.get(key);
if (serializer == null) {
if (type == SerializerType.SEATA) {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version});
Bughue marked this conversation as resolved.
Show resolved Hide resolved
} else {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name());
}
serializerMap.put(key, serializer);
}
return serializer;
}

private static String serialzerKey(SerializerType type, byte version) {
if (type == SerializerType.SEATA) {
return type.name() + version;
}
return type.name();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
* The type Message codec factory.
*
*/
public class MessageCodecFactory {
public abstract class MessageCodecFactory {

/**
* The constant UTF8.
Expand All @@ -95,7 +95,7 @@ public class MessageCodecFactory {
* @param abstractMessage the abstract message
* @return the message codec
*/
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage) {
public MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage) {
return getMessageCodec(abstractMessage.getTypeCode());
}

Expand All @@ -105,7 +105,7 @@ public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage)
* @param typeCode the type code
* @return the msg instance by code
*/
public static MessageSeataCodec getMessageCodec(short typeCode) {
public MessageSeataCodec getMessageCodec(short typeCode) {
MessageSeataCodec msgCodec = null;
switch (typeCode) {
case MessageType.TYPE_SEATA_MERGE:
Expand Down Expand Up @@ -166,7 +166,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
* @param typeCode the type code
* @return the merge request instance by code
*/
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode) {
protected MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN:
return new GlobalBeginRequestCodec();
Expand Down Expand Up @@ -195,7 +195,7 @@ protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode
* @param typeCode the type code
* @return the merge response instance by code
*/
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode) {
protected MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN_RESULT:
return new GlobalBeginResponseCodec();
Expand Down Expand Up @@ -230,7 +230,7 @@ protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCod
* @param typeCode the type code
* @return the message
*/
public static AbstractMessage getMessage(short typeCode) {
public AbstractMessage getMessage(short typeCode) {
AbstractMessage abstractMessage = null;
switch (typeCode) {
case MessageType.TYPE_SEATA_MERGE:
Expand Down Expand Up @@ -295,7 +295,7 @@ public static AbstractMessage getMessage(short typeCode) {
* @param typeCode the type code
* @return the merge request instance by code
*/
protected static AbstractMessage getMergeRequestInstanceByCode(int typeCode) {
protected AbstractMessage getMergeRequestInstanceByCode(int typeCode) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN:
return new GlobalBeginRequest();
Expand Down Expand Up @@ -324,7 +324,7 @@ protected static AbstractMessage getMergeRequestInstanceByCode(int typeCode) {
* @param typeCode the type code
* @return the merge response instance by code
*/
protected static AbstractMessage getMergeResponseInstanceByCode(int typeCode) {
protected AbstractMessage getMergeResponseInstanceByCode(int typeCode) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN_RESULT:
return new GlobalBeginResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,58 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.seata.common.exception.NotSupportYetException;
import io.seata.common.loader.LoadLevel;
import io.seata.common.loader.Scope;
import io.seata.common.util.BufferUtils;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.serializer.Serializer;
import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1;

/**
* The Seata codec.
*
*/
@LoadLevel(name = "SEATA")
@LoadLevel(name = "SEATA", scope = Scope.PROTOTYPE)
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
public class SeataSerializer implements Serializer {

MessageCodecFactory factory;
byte protocolVersion ;

public SeataSerializer(Byte version){
if (version == ProtocolConstants.VERSION_1) {
factory = new MessageCodecFactoryV1();
}else {
throw new NotSupportYetException("not support version" + version);
}
protocolVersion = version;
}
@Override
public <T> byte[] serialize(T t) {
if (!(t instanceof AbstractMessage)) {
throw new IllegalArgumentException("AbstractMessage isn't available.");
}
AbstractMessage abstractMessage = (AbstractMessage)t;
//typecode
//type code
short typecode = abstractMessage.getTypeCode();
//msg codec
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typecode);
//get empty ByteBuffer
ByteBuf out = Unpooled.buffer(1024);
//msg encode
messageCodec.encode(t, out);
byte[] body = new byte[out.readableBytes()];
out.readBytes(body);

//typecode + body
ByteBuffer byteBuffer = ByteBuffer.allocate(2 + body.length);
byteBuffer.putShort(typecode);
ByteBuffer byteBuffer;
if (protocolVersion == ProtocolConstants.VERSION_0) {
Bughue marked this conversation as resolved.
Show resolved Hide resolved
byteBuffer = ByteBuffer.allocate(body.length);
} else {
//typecode + body
byteBuffer = ByteBuffer.allocate(2 + body.length);
byteBuffer.putShort(typecode);
}
byteBuffer.put(body);

BufferUtils.flip(byteBuffer);
Expand All @@ -76,9 +96,9 @@ public <T> T deserialize(byte[] bytes) {
byteBuffer.get(body);
ByteBuffer in = ByteBuffer.wrap(body);
//new Messgae
AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typecode);
AbstractMessage abstractMessage = factory.getMessage(typecode);
//get messageCodec
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typecode);
//decode
messageCodec.decode(abstractMessage, in);
return (T)abstractMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.seata.core.protocol.BatchResultMessage;
import io.seata.serializer.seata.MessageCodecFactory;
import io.seata.serializer.seata.MessageSeataCodec;
import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1;

/**
* the type batch result message codec
Expand All @@ -35,6 +36,8 @@
*/
public class BatchResultMessageCodec extends AbstractMessageCodec {

protected MessageCodecFactory factory = new MessageCodecFactoryV1();

@Override
public Class<?> getMessageClassType() {
return BatchResultMessage.class;
Expand All @@ -53,7 +56,7 @@ public <T> void encode(T t, ByteBuf out) {
for (final AbstractMessage msg : msgs) {
final ByteBuf subBuffer = Unpooled.buffer(1024);
short typeCode = msg.getTypeCode();
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typeCode);
messageCodec.encode(msg, subBuffer);
buffer.writeShort(msg.getTypeCode());
buffer.writeBytes(subBuffer);
Expand Down Expand Up @@ -106,8 +109,8 @@ protected void decode(BatchResultMessage batchResultMessage, ByteBuffer byteBuff
List<Integer> msgIds = new ArrayList<>();
for (int idx = 0; idx < msgNum; idx++) {
short typeCode = byteBuffer.getShort();
AbstractMessage abstractResultMessage = MessageCodecFactory.getMessage(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
AbstractMessage abstractResultMessage = factory.getMessage(typeCode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typeCode);
messageCodec.decode(abstractResultMessage, byteBuffer);
msgs.add((AbstractResultMessage) abstractResultMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1;

/**
* The type Merge result message codec.
*
*/
public class MergeResultMessageCodec extends AbstractMessageCodec {

protected MessageCodecFactory factory = new MessageCodecFactoryV1();
Bughue marked this conversation as resolved.
Show resolved Hide resolved

@Override
public Class<?> getMessageClassType() {
return MergeResultMessage.class;
Expand All @@ -48,7 +51,7 @@ public <T> void encode(T t, ByteBuf out) {
short typeCode = msg.getTypeCode();
//put typeCode
out.writeShort(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typeCode);
messageCodec.encode(msg, out);
}

Expand Down Expand Up @@ -90,8 +93,8 @@ protected void decode(MergeResultMessage mergeResultMessage, ByteBuffer byteBuff
AbstractResultMessage[] msgs = new AbstractResultMessage[msgNum];
for (int idx = 0; idx < msgNum; idx++) {
short typeCode = byteBuffer.getShort();
AbstractMessage abstractResultMessage = MessageCodecFactory.getMessage(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
AbstractMessage abstractResultMessage = factory.getMessage(typeCode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typeCode);
messageCodec.decode(abstractResultMessage, byteBuffer);
msgs[idx] = (AbstractResultMessage)abstractResultMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import io.seata.serializer.seata.MessageSeataCodec;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1;

/**
* The type Merged warp message codec.
*
*/
public class MergedWarpMessageCodec extends AbstractMessageCodec {

protected MessageCodecFactory factory = new MessageCodecFactoryV1();

@Override
public Class<?> getMessageClassType() {
return MergedWarpMessage.class;
Expand All @@ -51,7 +54,7 @@ public <T> void encode(T t, ByteBuf out) {
for (final AbstractMessage msg : msgs) {
final ByteBuf subBuffer = Unpooled.buffer(1024);
short typeCode = msg.getTypeCode();
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typeCode);
messageCodec.encode(msg, subBuffer);
buffer.writeShort(msg.getTypeCode());
buffer.writeBytes(subBuffer);
Expand Down Expand Up @@ -96,8 +99,8 @@ private void doDecode(MergedWarpMessage mergedWarpMessage, ByteBuffer byteBuffer
List<AbstractMessage> msgs = new ArrayList<AbstractMessage>();
for (int idx = 0; idx < msgNum; idx++) {
short typeCode = byteBuffer.getShort();
AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
AbstractMessage abstractMessage = factory.getMessage(typeCode);
MessageSeataCodec messageCodec = factory.getMessageCodec(typeCode);
messageCodec.decode(abstractMessage, byteBuffer);
msgs.add(abstractMessage);
}
Expand Down