Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New RPC protocol #1276

Merged
merged 7 commits into from
Jul 11, 2019
Merged

New RPC protocol #1276

merged 7 commits into from
Jul 11, 2019

Conversation

ujjboy
Copy link
Contributor

@ujjboy ujjboy commented Jul 7, 2019

Ⅰ. Describe what this PR did

  • Add new protocol codec.
  • Change msgId to int
  • The response uses the same codec and compressor with the request.

Ⅱ. Does this pull request fix one issue?

#893

Ⅲ. Why don't you add test cases (unit test/integration test)?

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

@codecov-io
Copy link

codecov-io commented Jul 7, 2019

Codecov Report

Merging #1276 into develop will decrease coverage by 1.13%.
The diff coverage is 7.05%.

Impacted file tree graph

@@              Coverage Diff              @@
##             develop    #1276      +/-   ##
=============================================
- Coverage      49.44%   48.31%   -1.14%     
+ Complexity      1652     1643       -9     
=============================================
  Files            326      334       +8     
  Lines          11446    11600     +154     
  Branches        1412     1432      +20     
=============================================
- Hits            5659     5604      -55     
- Misses          5157     5376     +219     
+ Partials         630      620      -10
Impacted Files Coverage Δ Complexity Δ
...ava/io/seata/core/constants/ConfigurationKeys.java 0% <ø> (ø) 0 <0> (ø) ⬇️
...c/main/java/io/seata/core/rpc/netty/RpcServer.java 0% <0%> (ø) 0 <0> (ø) ⬇️
.../java/io/seata/core/protocol/HeartbeatMessage.java 0% <0%> (ø) 0 <0> (ø) ⬇️
...ata/core/rpc/DefaultServerMessageListenerImpl.java 0% <0%> (ø) 0 <0> (ø) ⬇️
...ava/io/seata/core/rpc/netty/RmMessageListener.java 0% <0%> (ø) 0 <0> (ø) ⬇️
.../io/seata/core/rpc/netty/v1/ProtocolV1Encoder.java 0% <0%> (ø) 0 <0> (?)
...o/seata/common/io/UnsafeByteArrayOutputStream.java 0% <0%> (ø) 0 <0> (?)
.../io/seata/core/rpc/netty/v1/HeadMapSerializer.java 0% <0%> (ø) 0 <0> (?)
.../main/java/io/seata/core/codec/CompressorType.java 0% <0%> (ø) 0 <0> (?)
.../io/seata/core/rpc/netty/v1/ProtocolV1Decoder.java 0% <0%> (ø) 0 <0> (?)
... and 26 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a125233...85c37d1. Read the comment docs.

@zhangthen zhangthen added this to the 0.7.0 milestone Jul 8, 2019
* @author Geng Zhang
*/
public class PositiveAtomicCounter {
private static final int MASK = 0x7FFFFFFF;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

range:0~2^31-1 , how about 0~2^32-1 ?

@@ -0,0 +1,120 @@
/*
* Copyright 1999-2019 Seata.io Group.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use apache/dubbo license header.

@@ -0,0 +1,103 @@
/*
* Copyright 1999-2019 Seata.io Group.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use apache/dubbo license header.

rpcMessage.setRequest(true);
rpcMessage.setMessageType(msg instanceof HeartbeatMessage ?
ProtocolConstants.MSGTYPE_RESQUEST
: ProtocolConstants.MSGTYPE_HEARTBEAT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mistake?

* @param b 原始byte
* @return byte数组{<16,<16}
*/
public static byte[] parseHigh4Low4Bytes(byte b) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name is a bit confusing, remove all Chinese comments.

* @param channel the channel
* @param msg the msg
*/
protected void sendResponse(long msgId, Channel channel, Object msg) {
protected void sendResponse(RpcMessage request, Channel channel, Object msg) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use request.getId why wrap as RpcMessage. if use RpcMessage ,it is best to wrap msg in the body

* @param ctx the ctx
* @param message the message
* @param sender the sender
*/
void onTrxMessage(long msgId, ChannelHandlerContext ctx, Object message, ServerMessageSender sender);
void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx, Object message, ServerMessageSender sender);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RpcMessage contains message.

* @param ctx the ctx
* @param message the message
* @param sender the sender
* @param checkAuthHandler the check auth handler
*/
void onRegRmMessage(long msgId, ChannelHandlerContext ctx, RegisterRMRequest message,
void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx, RegisterRMRequest message,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

* @param ctx the ctx
* @param message the message
* @param sender the sender
* @param checkAuthHandler the check auth handler
*/
void onRegTmMessage(long msgId, ChannelHandlerContext ctx, RegisterTMRequest message,
void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx, RegisterTMRequest message,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

* @param channel the channel
* @param msg the msg
*/
void sendResponse(long msgId, Channel channel, Object msg);
void sendResponse(RpcMessage request, Channel channel, Object msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@@ -68,10 +70,21 @@
* The Message executor.
*/
protected final ThreadPoolExecutor messageExecutor;

private static PositiveAtomicCounter NEXT_ID = new PositiveAtomicCounter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

// TODO add compressor like CodecFactory
public static final byte COMPRESS_NONE = 0;

public static final byte DEFAULT_CODEC = CodecType.valueOf(ConfigurationFactory.getInstance().getConfig(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not default.

public static final byte DEFAULT_CODEC = CodecType.valueOf(ConfigurationFactory.getInstance().getConfig(
ConfigurationKeys.SERIALIZE_FOR_RPC, CodecType.SEATA.name()).toUpperCase()).getCode();

public static final byte DEFAULT_COMPRESSOR = COMPRESS_NONE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not default

rpcMessage.setHeartbeat(false);
rpcMessage.setRequest(true);
rpcMessage.setId(getNextMessageId());
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one way?

private final boolean HEART_BEAT_FIELD = true;
private final boolean REQUEST_FIELD = false;
private final long ID_FIELD = 100L;
private final int ID_FIELD = 100;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static

RpcMessage rpcMessage = (RpcMessage) msg;

int fullLength = 13;
int headLength = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headLength is only the size of the headmap?

@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (out == null) {
out = ctx.alloc().buffer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread-safe, why the buffer is null?

Copy link
Member

@slievrly slievrly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AT mode function verification passed(seata)

Copy link
Member

@slievrly slievrly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protobuf:
image

@leizhiyuan
Copy link
Contributor

image

It seems ok in my local machine.

Copy link
Member

@slievrly slievrly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AT mode function verification passed(protobuf)

Copy link
Member

@slievrly slievrly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other minor modifications will be in another pr.

@slievrly slievrly changed the title [WIP] New RPC protocol. New RPC protocol Jul 11, 2019
Copy link
Contributor

@leizhiyuan leizhiyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@leizhiyuan leizhiyuan merged commit 7505dea into apache:develop Jul 11, 2019
@ujjboy ujjboy deleted the new_rpc_protocol branch July 11, 2019 12:01
nick-tan pushed a commit to nick-tan/seata that referenced this pull request Jul 12, 2019
* New rpc protocol.

* remove debug log, encode body of heartbeat.

* bugfix: pass txId into TCC interceptor (apache#1278)

* Fix same code as review.

* Fix protobuf and add default conf.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants