-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-32650][protobuf]Added the ability to split flink-protobuf code… #23162
Conversation
@libenchao hi, If you have time recently, can you help review this code? |
@maosuhan Are u interested in reviewing this? |
@libenchao Sure, I will take it. Maybe it will take me about one week. |
...uf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
Outdated
Show resolved
Hide resolved
a567d7f
to
e609488
Compare
@ljw-hit Hi, thanks for your effort and the code is already in good shape to me. I have left a few comments about unit tests. And could you provide a benchmark test for this improvement? For example, how much time of encoding/decoding 10M large rows can be saved after this improvement.. |
@maosuhan I haven’t seen any comments about UT here. Have the comments been submitted? |
* | ||
* <p>It is valid proto definition. | ||
*/ | ||
public class BigProtoBufCodeSpiltterTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ljw-hit I suggest that we retrieve the generated code from RowToProtoConverter or ProtoToRowConverter, then check the code to make sure that static split code exists.
Also write a complete deserialization/serialization tests to make sure that the data can be correctly processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maosuhan I have complete deserialization/serialization tests and thank you for carefully reviewing my code,
Are there any other issues with the current code? If not, can you ask the commiter to merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ljw-hit It seems the test code can not explicitly tell if the BigPbMessage is handled by split or non-split logic. Can you write test to ensure BigPbMessage is handled by split logic? For example, check the existence of generated code for the split code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, this is no problem. I can add some explicit tests to indicate that the current code has been split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maosuhan resolved , i use isCodeSplit method to explicit indicate that the current code has been split
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ljw-hit Adding this checking flag looks good to me. I think this MR is in a good status now. Thanks for your effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maosuhan Thank you very much for your review!@libenchao Can you pass this PR?
7bc8bed
to
816de52
Compare
@libenchao Sorry to bother you, do you have time to do the last step of reveiw and commit recently? |
Thanks for the patience, I'll review this next week, let's move forward to get it in. |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ljw-hit Thanks for your contribution, I've left my comments below.
public class PbCodeSplitter { | ||
private final List<String> splitMethodStack = new ArrayList<>(); | ||
|
||
public PbCodeSplitter() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add a blank public default constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
float f_field_8 = 29; | ||
bool f_field_9 = 30; | ||
string f_field_10 = 31; | ||
bytes f_field_11 = 32; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field naming is not consistent(int_field, a_field_n, map_field), can you normalize it with one pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
rowData.setField(9, false); | ||
rowData.setField(10, 1F); | ||
rowData.setField(11, 2D); | ||
rowData.setField(12, new byte[] {1, 2, 3}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you set for all values, then we can be confident that splitting does not affect correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, thank you for your suggestion, I will fix it.
* Flink-Protobuf serialize codegen code size is 13999, over code threshold. | ||
* So pbCodeSplitter split the code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is not really needed since the test name and body have already explained it. Besides, 13999 could go stale easily in the future's iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
* So pbCodeSplitter split the code. | ||
*/ | ||
@Test | ||
public void testSerializeSplit() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about testSplitInSerialization
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, thanks for the suggestion, I will adopt it
* @param pbObjectCode may be a variable or expression. Current codegen environment can use this | ||
* literal name directly to access the input. {@code pbObject} should be a protobuf object | ||
* literal name directly to access the input. {@code pbGetStr} is a value coming from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why we should reference something not in the method signature, and why we should change it from resultVariable
to returnInternalDataVarName
, and pbObject
to pbGetStr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
* literal name directly to access the input. {@code pbObject} should be a protobuf object | ||
* literal name directly to access the input. {@code pbGetStr} is a value coming from | ||
* protobuf object | ||
* @param pbCodeSplitter when encode/decode method body over 4K, use PbCodeSplitter to Split |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it's PbConstant.CODEGEN_SPLIT_THRESHOLD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
@@ -41,8 +42,8 @@ public PbCodegenSimpleSerializer( | |||
this.formatContext = formatContext; | |||
} | |||
|
|||
@Override | |||
public String codegen(String resultVar, String flinkObjectCode, int indent) | |||
public String codegenSplit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
* @param internalDataGetStr may be a variable or expression. Current codegen environment can | ||
* use this literal name directly to access the input. {@code internalDataGetStr} is a value | ||
* coming from flink object. | ||
* @param pbCodeSplitter when encode/decode method body over 4K, use PbCodeSplitter to Split |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments in PbCodegenDeserializer
also apply here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
return String.format("%s(%s, %s);", splitMethodName, rowDataVar, messageTypeVar); | ||
} | ||
|
||
public String splitSerializerRowTypeMethod( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
splitSerializerRowTypeMethod
and splitDeserializerRowTypeMethod
share most of codes, hence I'm wondering if we can reuse them.
Further more, I think these two methods are actually not necessary, and PbCodeSplitter
is kind of confusing. Can we just use PbFormatContext
with:
- Add a
final List<String> splitMethods = new ArrayList()
- Add a method
addCodeIntoMethod(String code)
And leave others to the caller, since there is only one caller of these two methods.
Then we can avoid introducing PbCodeSplitter
everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for such targeted suggestions.
- I have found a way to reuse code, I can solve this part.
2.Regarding the second point, I am a little confused. Do you mean that pbCodeSplitter is not needed? Put all codeSplit logic into pbFomartContext?
@@ -27,4 +27,10 @@ public class PbConstant { | |||
public static final String PB_MAP_KEY_NAME = "key"; | |||
public static final String PB_MAP_VALUE_NAME = "value"; | |||
public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass"; | |||
/** | |||
* JIT optimizer threshold is 8K, unicode encode one char use 2byte, so use 3K as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct, for ascii chars, there is only 1 byte in unicode encoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion, I will modify my comment. By the way, if 1 character corresponds to 1 byte, does this threshold need to be modified?
@libenchao Thank you very much for your code review. I learned a lot from this review and I have solved all the comments. Please review again in your free time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ljw-hit Thanks for the updating, generally looks good now, I've left a few more minor comments.
String genCode = codegenDes.codegen("rowData", "message", 0); | ||
// if codgen generate code size over threshod then split the code | ||
PbCodeSplitter pbCodeSplitter = new PbCodeSplitter(); | ||
LOG.info("Fast-pb generate split deserialize code"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping, it seems you are missing this one.
@@ -109,4 +117,8 @@ public byte[] convertRowToProtoBinary(RowData rowData) throws Exception { | |||
AbstractMessage message = (AbstractMessage) encodeMethod.invoke(null, rowData); | |||
return message.toByteArray(); | |||
} | |||
|
|||
public boolean isCodeSplit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for testing.
@@ -84,11 +85,18 @@ public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig) | |||
PbCodegenSerializer codegenSer = | |||
PbCodegenSerializeFactory.getPbCodegenTopRowSer( | |||
descriptor, rowType, formatContext); | |||
LOG.info("Fast-pb generate split serialize code"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary log.
|
||
map<string, bytes> map_field_32 = 32; | ||
map<string, string> map_field_33 = 33; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always put a \n
in the final line, then it will not complain that "Now new line at the end of file".
@libenchao Thank you for your detailed review work. I have solved these comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of the change
When the number of fields exceeds a certain threshold and the compiled method body exceeds 8k, the decode/encode method will not be optimized by JIT, seriously affecting serialization or deserialization performance.
This pull request add the ability to split flink-protobuf codegen code to improve decode/encode method performance.
Brief change log
PbCodegenDeserializer/PbCodegenSerializer
Interface addcodegenSplit
methodPbCodegenDeserializer/PbCodegenSerializer
Impl to implementcodegenSplit
methodVerifying this change
BigProtoBufCodeSpiltterTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no) noDocumentation