Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [SPARK-19552] [BUILD] Upgrade Netty version to 4.1.8 final #16888

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,26 @@ private static class EncryptedMessage extends AbstractReferenceCounted implement
this.cos = cos;
this.byteEncChannel = ch;
}

@Override
public FileRegion touch() {
return this;
}

@Override
public FileRegion touch(Object hint) {
return this;
}

@Override
public FileRegion retain() {
return this;
}

@Override
public FileRegion retain(int increment) {
return this;
}

@Override
public long count() {
Expand All @@ -198,6 +218,11 @@ public long position() {
return 0;
}

@Override
public long transferred() {
return transferred;
}

@Override
public long transfered() {
return transferred;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
private final long bodyLength;
private long totalBytesTransferred;

@Override
public FileRegion touch() {
return this;
}

@Override
public FileRegion touch(Object hint) {
return this;
}

@Override
public FileRegion retain() {
return this;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this breaks the netty reference counting logic, I think instead it should be

return (TestFileRegion)super.retain();

}

@Override
public FileRegion retain(int increment) {
return this;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I think this should be

return (TestFileRegion)super.retain(increment);

}


/**
* When the write buffer size is larger than this limit, I/O will be done in chunks of this size.
* The size should not be too large as it will waste underlying memory copy. e.g. If network
Expand Down Expand Up @@ -92,6 +113,11 @@ public long position() {

@Override
public long transfered() {
return transferred();
}

@Override
public long transferred() {
return totalBytesTransferred;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;

import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;
Expand Down Expand Up @@ -136,6 +137,31 @@ static class EncryptedMessage extends AbstractReferenceCounted implements FileRe
private final ByteBuf buf;
private final FileRegion region;

@Override
public FileRegion touch() {
return region;
}

@Override
public FileRegion touch(Object hint) {
return region;
}

@Override
public FileRegion retain() {
return region;
}

@Override
public FileRegion retain(int increment) {
return region;
}

@Override
public long transferred() {
return 0;
}

/**
* A channel used to buffer input data for encryption. The channel has an upper size bound
* so that if the input is larger than the allowed buffer, it will be broken into multiple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc

private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion {

@Override
public FileRegion touch() {
return this;
}

@Override
public FileRegion touch(Object hint) {
return this;
}

@Override
public FileRegion retain() {
return this;
}

@Override
public FileRegion retain(int increment) {
return this;
}

private final int writeCount;
private final int writesPerCall;
private int written;
Expand All @@ -129,6 +149,12 @@ public long position() {
return 0;
}

@Override
public long transferred() {
return 8 * written;
}


@Override
public long transfered() {
return 8 * written;
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ metrics-jvm-3.1.2.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.9.9.Final.jar
netty-all-4.0.43.Final.jar
netty-all-4.1.8.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
oro-2.0.8.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ metrics-jvm-3.1.2.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.9.9.Final.jar
netty-all-4.0.43.Final.jar
netty-all-4.1.8.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
oro-2.0.8.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.43.Final</version>
<version>4.1.8.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down