Skip to content

Commit

Permalink
Upgrade Netty to 4.1.17
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing authored and BryanCutler committed Nov 27, 2017
1 parent e1dd03e commit 2311fa2
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,63 @@ public long transfered() {
return transferred;
}

@Override
public long transferred() {
return transferred;
}

/**
* Override this due to different return types of ReferenceCounted.touch and FileRegion.touch.
*/
@Override
public EncryptedMessage touch() {
super.touch();
return this;
}

@Override
public EncryptedMessage touch(Object o) {
if (region != null) {
region.touch(o);
}
if (buf != null) {
buf.touch(o);
}
return this;
}

/**
* Override this due to different return types of ReferenceCounted.touch and FileRegion.touch.
*/
@Override
public EncryptedMessage retain() {
super.retain();
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (region != null) {
region.retain(increment);
}
if (buf != null) {
buf.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public long transfered() {
return totalBytesTransferred;
}

@Override
public long transferred() {
return totalBytesTransferred;
}

/**
* This code is more complicated than you would think because we might require multiple
* transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
Expand Down Expand Up @@ -160,4 +165,50 @@ private int writeNioBuffer(

return ret;
}

/** Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. */
@Override
public MessageWithHeader touch() {
super.touch();
return this;
}

@Override
public MessageWithHeader touch(Object o) {
header.touch(o);
ReferenceCountUtil.touch(body, o);
return this;
}

/** Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. */
@Override
public MessageWithHeader retain() {
super.retain();
return this;
}

@Override
public MessageWithHeader retain(int increment) {
super.retain(increment);
header.retain(increment);
ReferenceCountUtil.retain(body, increment);
if (managedBuffer != null) {
for (int i = 0; i < increment; i++) {
managedBuffer.retain();
}
}
return this;
}

@Override
public boolean release(int decrement) {
header.release(decrement);
ReferenceCountUtil.release(body, decrement);
if (managedBuffer != null) {
for (int i = 0; i < decrement; i++) {
managedBuffer.release();
}
}
return super.release(decrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,63 @@ public long transfered() {
return transferred;
}

@Override
public long transferred() {
return transferred;
}

/**
* Override this due to different return types of ReferenceCounted.touch and FileRegion.touch.
*/
@Override
public EncryptedMessage touch() {
super.touch();
return this;
}

@Override
public EncryptedMessage touch(Object o) {
if (buf != null) {
buf.touch(o);
}
if (region != null) {
region.touch(o);
}
return this;
}

/**
* Override this due to different return types of ReferenceCounted.retain and FileRegion.retain.
*/
@Override
public EncryptedMessage retain() {
super.retain();
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (buf != null) {
buf.retain(increment);
}
if (region != null) {
region.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,34 @@ public long transfered() {
return 8 * written;
}

@Override
public long transferred() {
return 8 * written;
}

@Override
public TestFileRegion touch() {
super.touch();
return this;
}

@Override
public TestFileRegion touch(Object o) {
return this;
}

@Override
public TestFileRegion retain() {
super.retain();
return this;
}

@Override
public TestFileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
for (int i = 0; i < writesPerCall; i++) {
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,35 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:

override def transfered(): Long = _transferred

override def transferred(): Long = _transferred

/**
* Override this due to different return types of ReferenceCounted.touch and FileRegion.touch.
*/
override def touch(): this.type = {
super.touch()
this
}

override def touch(o: Object): this.type = {
this
}

/**
* Override this due to different return types of ReferenceCounted.retain and FileRegion.retain.
*/
override def retain(): this.type = {
super.retain()
this
}

override def retain(increment: Int): this.type = {
super.retain(increment)
this
}

override def release(decrement: Int): Boolean = super.release(decrement)

override def transferTo(target: WritableByteChannel, pos: Long): Long = {
assert(pos == transfered(), "Invalid position.")

Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ metrics-json-3.1.5.jar
metrics-jvm-3.1.5.jar
minlog-1.3.0.jar
netty-3.9.9.Final.jar
netty-all-4.0.47.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ metrics-json-3.1.5.jar
metrics-jvm-3.1.5.jar
minlog-1.3.0.jar
netty-3.9.9.Final.jar
netty-all-4.0.47.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.47.Final</version>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down

0 comments on commit 2311fa2

Please sign in to comment.