Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0 #19884

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.AbstractReferenceCounted;
import org.apache.commons.crypto.stream.CryptoInputStream;
import org.apache.commons.crypto.stream.CryptoOutputStream;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayReadableChannel;
import org.apache.spark.network.util.ByteArrayWritableChannel;

Expand Down Expand Up @@ -161,7 +161,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
}

private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
private static class EncryptedMessage extends AbstractFileRegion {
private final boolean isByteBuf;
private final ByteBuf buf;
private final FileRegion region;
Expand Down Expand Up @@ -199,10 +199,45 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (region != null) {
region.touch(o);
}
if (buf != null) {
buf.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (region != null) {
region.retain(increment);
}
if (buf != null) {
buf.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.AbstractFileRegion;

/**
* A wrapper message that holds two separate pieces (a header and a body).
*
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
*/
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
class MessageWithHeader extends AbstractFileRegion {

@Nullable private final ManagedBuffer managedBuffer;
private final ByteBuf header;
Expand Down Expand Up @@ -91,7 +91,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
Copy link
Member

@gatorsmile gatorsmile Jan 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks binary compatibility. Is it intentional? @zsxwing @cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. The old method is implemented in AbstractFileRegion.transfered. In addition, the whole network module is private, we don't need to maintain compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. AbstractFileRegion.transfered is final so it may break binary compatibility. However, this is fine since it's a private module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!

return totalBytesTransferred;
}

Expand Down Expand Up @@ -160,4 +160,37 @@ private int writeNioBuffer(

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
header.touch(o);
ReferenceCountUtil.touch(body, o);
return this;
}

@Override
public MessageWithHeader retain(int increment) {
super.retain(increment);
header.retain(increment);
ReferenceCountUtil.retain(body, increment);
if (managedBuffer != null) {
for (int i = 0; i < increment; i++) {
managedBuffer.retain();
}
}
return this;
}

@Override
public boolean release(int decrement) {
header.release(decrement);
ReferenceCountUtil.release(body, decrement);
if (managedBuffer != null) {
for (int i = 0; i < decrement; i++) {
managedBuffer.release();
}
}
return super.release(decrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;

Expand Down Expand Up @@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
}

@VisibleForTesting
static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
static class EncryptedMessage extends AbstractFileRegion {

private final SaslEncryptionBackend backend;
private final boolean isByteBuf;
Expand Down Expand Up @@ -183,10 +183,45 @@ public long position() {
* Returns an approximation of the amount of data transferred. See {@link #count()}.
*/
@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (buf != null) {
buf.touch(o);
}
if (region != null) {
region.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (buf != null) {
buf.retain(increment);
}
if (region != null) {
region.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion {

@Override
@SuppressWarnings("deprecation")
public final long transfered() {
return transferred();
}

@Override
public AbstractFileRegion retain() {
super.retain();
return this;
}

@Override
public AbstractFileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public AbstractFileRegion touch() {
super.touch();
return this;
}

@Override
public AbstractFileRegion touch(Object o) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!serverChannel.outboundMessages().isEmpty()) {
clientChannel.writeInbound(serverChannel.readOutbound());
clientChannel.writeOneInbound(serverChannel.readOutbound());
}

assertEquals(1, clientChannel.inboundMessages().size());
Expand All @@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!clientChannel.outboundMessages().isEmpty()) {
serverChannel.writeInbound(clientChannel.readOutbound());
serverChannel.writeOneInbound(clientChannel.readOutbound());
}

assertEquals(1, serverChannel.inboundMessages().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import org.apache.spark.network.util.AbstractFileRegion;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -108,7 +107,7 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc
return Unpooled.wrappedBuffer(channel.getData());
}

private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion {
private static class TestFileRegion extends AbstractFileRegion {

private final int writeCount;
private final int writesPerCall;
Expand All @@ -130,7 +129,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return 8 * written;
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ListBuffer

import com.google.common.io.Closeables
import io.netty.channel.{DefaultFileRegion, FileRegion}
import io.netty.util.AbstractReferenceCounted
import io.netty.channel.DefaultFileRegion

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.io.ChunkedByteBuffer
Expand Down Expand Up @@ -266,7 +265,7 @@ private class EncryptedBlockData(
}

private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long)
extends AbstractReferenceCounted with FileRegion {
extends AbstractFileRegion {

private var _transferred = 0L

Expand All @@ -277,7 +276,7 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:

override def position(): Long = 0

override def transfered(): Long = _transferred
override def transferred(): Long = _transferred

override def transferTo(target: WritableByteChannel, pos: Long): Long = {
assert(pos == transfered(), "Invalid position.")
Expand Down
10 changes: 5 additions & 5 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.4.0.jar
arrow-memory-0.4.0.jar
arrow-vector-0.4.0.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
avro-1.7.7.jar
avro-ipc-1.7.7.jar
avro-mapred-1.7.7-hadoop2.jar
Expand Down Expand Up @@ -82,7 +82,7 @@ hadoop-yarn-server-web-proxy-2.6.5.jar
hk2-api-2.4.0-b34.jar
hk2-locator-2.4.0-b34.jar
hk2-utils-2.4.0-b34.jar
hppc-0.7.1.jar
hppc-0.7.2.jar
htrace-core-3.0.4.jar
httpclient-4.5.2.jar
httpcore-4.4.4.jar
Expand Down Expand Up @@ -144,7 +144,7 @@ metrics-json-3.1.5.jar
metrics-jvm-3.1.5.jar
minlog-1.3.0.jar
netty-3.9.9.Final.jar
netty-all-4.0.47.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
Expand Down
10 changes: 5 additions & 5 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.4.0.jar
arrow-memory-0.4.0.jar
arrow-vector-0.4.0.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
avro-1.7.7.jar
avro-ipc-1.7.7.jar
avro-mapred-1.7.7-hadoop2.jar
Expand Down Expand Up @@ -82,7 +82,7 @@ hadoop-yarn-server-web-proxy-2.7.3.jar
hk2-api-2.4.0-b34.jar
hk2-locator-2.4.0-b34.jar
hk2-utils-2.4.0-b34.jar
hppc-0.7.1.jar
hppc-0.7.2.jar
htrace-core-3.1.0-incubating.jar
httpclient-4.5.2.jar
httpcore-4.4.4.jar
Expand Down Expand Up @@ -145,7 +145,7 @@ metrics-json-3.1.5.jar
metrics-jvm-3.1.5.jar
minlog-1.3.0.jar
netty-3.9.9.Final.jar
netty-all-4.0.47.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
Expand Down
Loading