-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#908] feat(tez): Write byte array shuffle data to MapOutput #909
Conversation
} | ||
|
||
@VisibleForTesting | ||
protected static byte[] calcChecksum(final byte[] buffer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reuse org.apache.uniffle.common.util.ChecksumUtils#getCrc32?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, we did not know this method before, this method is ok and modified.
|
||
@VisibleForTesting | ||
protected static byte[] calcChecksum(final byte[] buffer) throws IOException { | ||
DataChecksum sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, Integer.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the privillege protected
? If we hope to have privillege to test this method, we can put the test code and source code into the same package, and we can use default privillege to achieve the aim.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, we removed this method and use org.apache.uniffle.common.util.ChecksumUtils#getCrc32
} | ||
|
||
|
||
public static void write(final FetchedInput mapOutput, byte[] buffer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a test for this method?
private static final Logger LOG = LoggerFactory.getLogger(RssTezBypassWriter.class); | ||
private static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 0}; | ||
|
||
public static void write(MapOutput mapOutput, byte[] buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a test for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we added some, is it enough ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
} else if (mapOutput.getType() == MapOutput.Type.DISK) { | ||
// RSS leverages its own compression, it is incompatible with hadoop's disk file compression. | ||
// So we should disable this situation. | ||
throw new IllegalStateException("RSS does not support OnDiskMapOutput as shuffle ouput," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we throw RssException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, we modified it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we don't modify this exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we missed it, and now modified it.
OutputStream output = ((DiskFetchedInput) mapOutput).getOutputStream(); | ||
output.write(HEADER); | ||
output.write(buffer); | ||
output.write(Ints.toByteArray((int)ChecksumUtils.getCrc32(buffer))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em.... it seems different from origin implement. Why do return integer type instead of long time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tez only read 4 byte to do checksum,if write 8 byte, it will wrong when read and do checksum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
Could you rebase or merge master branch? |
cc @zhengchenyu |
@lifeSo This comment isn't addressed. #909 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @lifeSo , wait for CI.
Codecov Report
@@ Coverage Diff @@
## master #909 +/- ##
============================================
+ Coverage 55.23% 56.19% +0.95%
+ Complexity 2200 2069 -131
============================================
Files 333 297 -36
Lines 16451 13154 -3297
Branches 1308 1232 -76
============================================
- Hits 9087 7392 -1695
+ Misses 6851 5352 -1499
+ Partials 513 410 -103
... and 38 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
What changes were proposed in this pull request?
Write byte array shuffle data to MapOutput
Why are the changes needed?
Fix: # (908)
Does this PR introduce any user-facing change?
No.
How was this patch tested?
test unit