Skip to content

Commit

Permalink
[SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
Browse files Browse the repository at this point in the history
Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

Existing tests

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes apache#19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
  • Loading branch information
BryanCutler authored and sumwale committed Feb 7, 2022
1 parent 2ba4fc7 commit 9262683
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ allprojects {
chillVersion = '0.8.5'
kryoVersion = '4.0.2'
nettyVersion = '3.10.6.Final'
nettyAllVersion = '4.1.68.Final'
nettyAllVersion = '4.1.73.Final'
derbyVersion = '10.14.2.0'
httpClientVersion = '4.5.6'
httpCoreVersion = '4.4.10'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.AbstractFileRegion;

/**
* A wrapper message that holds two separate pieces (a header and a body).
*
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
*/
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
class MessageWithHeader extends AbstractFileRegion {

@Nullable private final ManagedBuffer managedBuffer;
private final ByteBuf header;
Expand Down Expand Up @@ -91,7 +91,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return totalBytesTransferred;
}

Expand Down Expand Up @@ -160,4 +160,37 @@ private int writeNioBuffer(

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
header.touch(o);
ReferenceCountUtil.touch(body, o);
return this;
}

@Override
public MessageWithHeader retain(int increment) {
super.retain(increment);
header.retain(increment);
ReferenceCountUtil.retain(body, increment);
if (managedBuffer != null) {
for (int i = 0; i < increment; i++) {
managedBuffer.retain();
}
}
return this;
}

@Override
public boolean release(int decrement) {
header.release(decrement);
ReferenceCountUtil.release(body, decrement);
if (managedBuffer != null) {
for (int i = 0; i < decrement; i++) {
managedBuffer.release();
}
}
return super.release(decrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;

Expand Down Expand Up @@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
}

@VisibleForTesting
static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
static class EncryptedMessage extends AbstractFileRegion {

private final SaslEncryptionBackend backend;
private final boolean isByteBuf;
Expand Down Expand Up @@ -183,10 +183,45 @@ public long position() {
* Returns an approximation of the amount of data transferred. See {@link #count()}.
*/
@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (buf != null) {
buf.touch(o);
}
if (region != null) {
region.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (buf != null) {
buf.retain(increment);
}
if (region != null) {
region.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion {

@Override
@SuppressWarnings("deprecation")
public final long transfered() {
return transferred();
}

@Override
public AbstractFileRegion retain() {
super.retain();
return this;
}

@Override
public AbstractFileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public AbstractFileRegion touch() {
super.touch();
return this;
}

@Override
public AbstractFileRegion touch(Object o) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!serverChannel.outboundMessages().isEmpty()) {
clientChannel.writeInbound(serverChannel.readOutbound());
clientChannel.writeOneInbound(serverChannel.readOutbound());
}

assertEquals(1, clientChannel.inboundMessages().size());
Expand All @@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!clientChannel.outboundMessages().isEmpty()) {
serverChannel.writeInbound(clientChannel.readOutbound());
serverChannel.writeOneInbound(clientChannel.readOutbound());
}

assertEquals(1, serverChannel.inboundMessages().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import org.apache.spark.network.util.AbstractFileRegion;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -108,7 +107,7 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc
return Unpooled.wrappedBuffer(channel.getData());
}

private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion {
private static class TestFileRegion extends AbstractFileRegion {

private final int writeCount;
private final int writesPerCall;
Expand All @@ -130,7 +129,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return 8 * written;
}

Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ metrics-jvm-3.1.2.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.9.9.Final.jar
netty-all-4.0.43.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
oro-2.0.8.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ metrics-jvm-3.1.2.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.9.9.Final.jar
netty-all-4.0.43.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
oro-2.0.8.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.43.Final</version>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down

0 comments on commit 9262683

Please sign in to comment.