Skip to content

Commit

Permalink
ARROW-6871: [Java] Enhance TransferPair related parameters check and …
Browse files Browse the repository at this point in the history
…tests

Related to [ARROW-6871](https://issues.apache.org/jira/browse/ARROW-6871).

TransferPair related param checks in different classes have potential problems:

i. splitAndTansfer has no indices check in classes like VarcharVector
ii. splitAndTranser indices check in classes like UnionVector is not correct (Preconditions.checkArgument(startIndex + length <= valueCount)), should check params separately.
iii. should add more UT to cover corner cases.

Closes #5645 from tianchen92/ARROW-6871 and squashes the following commits:

f3b897d <tianchen> fix style
0d3c7ea <tianchen> add benchmark
01f9a48 <tianchen> revert changes in copyFrom
a22d58a <tianchen> ARROW-6871:  Enhance TransferPair related parameters check and tests

Authored-by: tianchen <niki.lj@alibaba-inc.com>
Signed-off-by: Micah Kornfield <emkornfield@gmail.com>
  • Loading branch information
tianchen92 authored and kszucs committed Feb 7, 2020
1 parent 350c7bd commit 819b18d
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 19 deletions.
Expand Up @@ -46,6 +46,7 @@ public void sortInPlace(V vec, VectorValueComparator<V> comparator) {
this.comparator = comparator;
this.pivotBuffer = (V) vec.getField().createVector(vec.getAllocator());
this.pivotBuffer.allocateNew(1);
this.pivotBuffer.setValueCount(1);

comparator.attachVectors(vec, pivotBuffer);
quickSort();
Expand Down
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.util;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
* Benchmarks for {@link TransferPair}.
*/
@State(Scope.Benchmark)
public class TransferPairBenchmarks {

private static final int VECTOR_LENGTH = 1024;

private static final int ALLOCATOR_CAPACITY = 1024 * 1024;

private BufferAllocator allocator;

private IntVector intVector;

private VarCharVector varCharVector;

/**
* Setup benchmarks.
*/
@Setup
public void prepare() {
allocator = new RootAllocator(ALLOCATOR_CAPACITY);
intVector = new IntVector("intVector", allocator);
varCharVector = new VarCharVector("varcharVector", allocator);

intVector.allocateNew(VECTOR_LENGTH);
varCharVector.allocateNew(VECTOR_LENGTH);

for (int i = 0; i < VECTOR_LENGTH; i++) {
if (i % 3 == 0) {
intVector.setNull(i);
varCharVector.setNull(i);
} else {
intVector.setSafe(i, i * i);
varCharVector.setSafe(i, ("teststring" + i).getBytes(StandardCharsets.UTF_8));
}
}
intVector.setValueCount(VECTOR_LENGTH);
varCharVector.setValueCount(VECTOR_LENGTH);
}

/**
* Tear down benchmarks.
*/
@TearDown
public void tearDown() {
intVector.close();
varCharVector.close();;
allocator.close();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public int splitAndTransferIntVector() {
IntVector toVector = new IntVector("intVector", allocator);
toVector.setValueCount(VECTOR_LENGTH);
TransferPair transferPair = intVector.makeTransferPair(toVector);
transferPair.splitAndTransfer(0, VECTOR_LENGTH);
toVector.close();
return 0;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public int splitAndTransferVarcharVector() {
VarCharVector toVector = new VarCharVector("varcharVector", allocator);
toVector.setValueCount(VECTOR_LENGTH);
TransferPair transferPair = varCharVector.makeTransferPair(toVector);
transferPair.splitAndTransfer(0, VECTOR_LENGTH);
toVector.close();
return 0;
}

public static void main(String [] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(TransferPairBenchmarks.class.getSimpleName())
.forks(1)
.build();

new Runner(opt).run();
}
}
5 changes: 4 additions & 1 deletion java/vector/src/main/codegen/templates/UnionVector.java
Expand Up @@ -469,7 +469,10 @@ public void transfer() {

@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(startIndex + length <= valueCount);
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
to.clear();
internalStructVectorTransferPair.splitAndTransfer(startIndex, length);
final int startPoint = startIndex * TYPE_WIDTH;
Expand Down
Expand Up @@ -584,7 +584,10 @@ public void transferTo(BaseFixedWidthVector target) {
*/
public void splitAndTransferTo(int startIndex, int length,
BaseFixedWidthVector target) {
Preconditions.checkArgument(startIndex + length <= valueCount);
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
compareTypes(target, "splitAndTransferTo");
target.clear();
splitAndTransferValidityBuffer(startIndex, length, target);
Expand Down
Expand Up @@ -714,6 +714,10 @@ public void transferTo(BaseVariableWidthVector target) {
*/
public void splitAndTransferTo(int startIndex, int length,
BaseVariableWidthVector target) {
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
compareTypes(target, "splitAndTransferTo");
target.clear();
splitAndTransferValidityBuffer(startIndex, length, target);
Expand Down Expand Up @@ -750,7 +754,6 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariab
*/
private void splitAndTransferValidityBuffer(int startIndex, int length,
BaseVariableWidthVector target) {
Preconditions.checkArgument(startIndex + length <= valueCount);
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
Expand Down
Expand Up @@ -158,6 +158,10 @@ public int getBufferSize() {
* @param target destination vector
*/
public void splitAndTransferTo(int startIndex, int length, BaseFixedWidthVector target) {
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
compareTypes(target, "splitAndTransferTo");
target.clear();
target.validityBuffer = splitAndTransferBuffer(startIndex, length, target,
Expand All @@ -174,7 +178,6 @@ private ArrowBuf splitAndTransferBuffer(
BaseFixedWidthVector target,
ArrowBuf sourceBuffer,
ArrowBuf destBuffer) {
assert startIndex + length <= valueCount;
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
Expand Down
Expand Up @@ -590,7 +590,10 @@ public void transfer() {

@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(startIndex + length <= valueCount);
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
final int startPoint = listSize * startIndex;
final int sliceLength = listSize * length;
to.clear();
Expand Down
Expand Up @@ -493,7 +493,10 @@ public void transfer() {
*/
@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(startIndex + length <= valueCount);
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
Expand Down
Expand Up @@ -195,7 +195,10 @@ public void copyValueSafe(int fromIndex, int toIndex) {

@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(startIndex + length <= valueCount);
Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount,
"Invalid startIndex: %s", startIndex);
Preconditions.checkArgument(startIndex + length <= valueCount,
"Invalid length: %s", length);
target.clear();
splitAndTransferValidityBuffer(startIndex, length, target);
super.splitAndTransfer(startIndex, length);
Expand Down

0 comments on commit 819b18d

Please sign in to comment.