Skip to content

Commit

Permalink
DRILL-5709: Provide a value vector method to convert a vector to null…
Browse files Browse the repository at this point in the history
…able

Please see the DRILL-5709 for an explanation and example.

close #901
  • Loading branch information
Paul Rogers authored and Aman Sinha committed Sep 4, 2017
1 parent 789b83d commit 6829af0
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 10 deletions.
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.vector;

import static org.junit.Assert.*;

import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.SchemaBuilder;
import org.bouncycastle.util.Arrays;
import org.junit.Test;

public class TestToNullable extends SubOperatorTest {

@SuppressWarnings("resource")
@Test
public void testFixedWidth() {
MaterializedField intSchema =
SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED);
IntVector intVector = new IntVector(intSchema, fixture.allocator());
IntVector.Mutator intMutator = intVector.getMutator();
intVector.allocateNew(100);
for (int i = 0; i < 100; i++) {
intMutator.set(i, i * 10);
}
intMutator.setValueCount(100);

MaterializedField nullableIntSchema =
SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.OPTIONAL);
NullableIntVector nullableIntVector = new NullableIntVector(nullableIntSchema, fixture.allocator());

intVector.toNullable(nullableIntVector);

assertEquals(0, intVector.getAccessor().getValueCount());
NullableIntVector.Accessor niAccessor = nullableIntVector.getAccessor();
assertEquals(100, niAccessor.getValueCount());
for (int i = 0; i < 100; i++) {
assertFalse(niAccessor.isNull(i));
assertEquals(i * 10, niAccessor.get(i));
}

nullableIntVector.clear();

// Don't clear the intVector, it should be empty.
// If it is not, the test will fail with a memory leak error.
}

@SuppressWarnings("resource")
@Test
public void testNullable() {
MaterializedField nullableIntSchema =
SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.OPTIONAL);
NullableIntVector sourceVector = new NullableIntVector(nullableIntSchema, fixture.allocator());
NullableIntVector.Mutator sourceMutator = sourceVector.getMutator();
sourceVector.allocateNew(100);
for (int i = 0; i < 100; i++) {
sourceMutator.set(i, i * 10);
}
sourceMutator.setValueCount(100);

NullableIntVector destVector = new NullableIntVector(nullableIntSchema, fixture.allocator());

sourceVector.toNullable(destVector);

assertEquals(0, sourceVector.getAccessor().getValueCount());
NullableIntVector.Accessor destAccessor = destVector.getAccessor();
assertEquals(100, destAccessor.getValueCount());
for (int i = 0; i < 100; i++) {
assertFalse(destAccessor.isNull(i));
assertEquals(i * 10, destAccessor.get(i));
}

destVector.clear();

// Don't clear the intVector, it should be empty.
// If it is not, the test will fail with a memory leak error.
}

@SuppressWarnings("resource")
@Test
public void testVariableWidth() {
MaterializedField nonNullableSchema =
SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED);
VarCharVector nonNullableVector = new VarCharVector(nonNullableSchema, fixture.allocator());
VarCharVector.Mutator mutator = nonNullableVector.getMutator();
nonNullableVector.allocateNew(100, 20);
byte value[] = new byte[20];
for (int i = 0; i < 100; i++) {
Arrays.fill(value, (byte)('A' + i % 26));
mutator.setSafe(i, value);
}
mutator.setValueCount(100);

MaterializedField nullableVarCharSchema =
SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.OPTIONAL);
NullableVarCharVector nullableVector = new NullableVarCharVector(nullableVarCharSchema, fixture.allocator());

nonNullableVector.toNullable(nullableVector);

assertEquals(0, nonNullableVector.getAccessor().getValueCount());
NullableVarCharVector.Accessor nullableAccessor = nullableVector.getAccessor();
assertEquals(100, nullableAccessor.getValueCount());
for (int i = 0; i < 100; i++) {
assertFalse(nullableAccessor.isNull(i));
Arrays.fill(value, (byte)('A' + i % 26));
assertTrue(Arrays.areEqual(value, nullableAccessor.get(i)));
}

nullableVector.clear();

// Don't clear the nonNullableVector, it should be empty.
// If it is not, the test will fail with a memory leak error.
}
}
6 changes: 6 additions & 0 deletions exec/vector/src/main/codegen/templates/FixedValueVectors.java
Expand Up @@ -335,6 +335,12 @@ private void incrementAllocationMonitor() {
++allocationMonitor;
}

@Override
public void toNullable(ValueVector nullableVector) {
Nullable${minor.class}Vector dest = (Nullable${minor.class}Vector) nullableVector;
dest.getMutator().fromNotNullable(this);
}

public final class Accessor extends BaseDataValueVector.BaseAccessor {
@Override
public int getValueCount() {
Expand Down
38 changes: 31 additions & 7 deletions exec/vector/src/main/codegen/templates/NullableValueVectors.java
Expand Up @@ -133,9 +133,10 @@ public DrillBuf getBuffer() {
}

@Override
public ${valuesName} getValuesVector() {
return values;
}
public ${valuesName} getValuesVector() { return values; }

@Override
public UInt1Vector getBitsVector() { return bits; }

@Override
public void setInitialCapacity(int numRecords) {
Expand Down Expand Up @@ -398,6 +399,14 @@ public void exchange(ValueVector other) {
mutator.exchange(other.getMutator());
}

<#if type.major != "VarLen">
@Override
public void toNullable(ValueVector nullableVector) {
exchange(nullableVector);
clear();
}

</#if>
public final class Accessor extends BaseDataValueVector.BaseAccessor <#if type.major = "VarLen">implements VariableWidthVector.VariableWidthAccessor</#if> {
final UInt1Vector.Accessor bAccessor = bits.getAccessor();
final ${valuesName}.Accessor vAccessor = values.getAccessor();
Expand Down Expand Up @@ -449,17 +458,17 @@ public void get(int index, Nullable${minor.class}Holder holder){
@Override
public ${friendlyType} getObject(int index) {
if (isNull(index)) {
return null;
}else{
return null;
} else {
return vAccessor.getObject(index);
}
}
<#if minor.class == "Interval" || minor.class == "IntervalDay" || minor.class == "IntervalYear">
public StringBuilder getAsStringBuilder(int index) {
if (isNull(index)) {
return null;
}else{
return null;
} else {
return vAccessor.getAsStringBuilder(index);
}
}
Expand Down Expand Up @@ -778,6 +787,21 @@ public void exchange(ValueVector.Mutator other) {
setCount = target.setCount;
target.setCount = temp;
}

public void fromNotNullable(${minor.class}Vector srce) {
clear();
final int valueCount = srce.getAccessor().getValueCount();

// Create a new bits vector, all values non-null

fillBitsVector(getBitsVector(), valueCount);

// Swap the data portion

getValuesVector().exchange(srce);
<#if type.major = "VarLen">lastSet = valueCount;</#if>
setValueCount(valueCount);
}
}
}
</#list>
Expand Down
5 changes: 5 additions & 0 deletions exec/vector/src/main/codegen/templates/UnionVector.java
Expand Up @@ -267,6 +267,11 @@ public ValueVector addVector(ValueVector v) {
return newVector;
}
@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
private class TransferImpl implements TransferPair {
UnionVector to;
Expand Down
Expand Up @@ -422,6 +422,12 @@ public void exchange(ValueVector other) {
offsetVector.exchange(target.offsetVector);
}

@Override
public void toNullable(ValueVector nullableVector) {
Nullable${minor.class}Vector dest = (Nullable${minor.class}Vector) nullableVector;
dest.getMutator().fromNotNullable(this);
}

public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor {
final UInt${type.width}Vector.Accessor oAccessor = offsetVector.getAccessor();
public long getStartEnd(int index){
Expand Down
Expand Up @@ -19,6 +19,7 @@

import io.netty.buffer.DrillBuf;

import java.util.Collections;
import java.util.Iterator;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void exchange(ValueVector.Mutator other) { }

@Override
public Iterator<ValueVector> iterator() {
return Iterators.emptyIterator();
return Collections.emptyIterator();
}

public static boolean checkBufRefs(final ValueVector vv) {
Expand All @@ -133,5 +134,22 @@ public static boolean checkBufRefs(final ValueVector vv) {
public BufferAllocator getAllocator() {
return allocator;
}

public static void fillBitsVector(UInt1Vector bits, int valueCount) {

// Create a new bits vector, all values non-null

bits.allocateNew(valueCount);
UInt1Vector.Mutator bitsMutator = bits.getMutator();
for (int i = 0; i < valueCount; i++) {
bitsMutator.set(i, 1);
}
bitsMutator.setValueCount(valueCount);
}

@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
}

Expand Up @@ -531,4 +531,10 @@ public void clear() {
public int getPayloadByteCount(int valueCount) {
return getSizeFromCount(valueCount);
}

@Override
public void toNullable(ValueVector nullableVector) {
NullableBitVector dest = (NullableBitVector) nullableVector;
dest.getMutator().fromNotNullable(this);
}
}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -19,5 +19,6 @@

public interface NullableVector extends ValueVector{

ValueVector getBitsVector();
ValueVector getValuesVector();
}
Expand Up @@ -206,6 +206,11 @@ public Iterator<ValueVector> iterator() {
throw new UnsupportedOperationException("ObjectVector does not support this");
}

@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}

public final class Accessor extends BaseAccessor {
@Override
public Object getObject(int index) {
Expand Down
Expand Up @@ -227,6 +227,19 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {

void exchange(ValueVector other);

/**
* Convert a non-nullable vector to nullable by shuffling the data from
* one to the other. Avoids the need to generate copy code just to change
* mode. If this vector is non-nullable, accepts a nullable dual (same
* minor type, different mode.) If the vector is non-nullable, or non-scalar,
* then throws an exception.
*
* @param nullableVector nullable vector of the same minor type as
* this vector
*/

void toNullable(ValueVector nullableVector);

/**
* An abstraction that is used to read from this vector instance.
*/
Expand Down
Expand Up @@ -162,8 +162,14 @@ public void copyEntry(int toIndex, ValueVector from, int fromIndex) { }
@Override
public void exchange(ValueVector other) { }

@Override
public void collectLedgers(Set<BufferLedger> ledgers) {}

@Override
public int getPayloadByteCount(int valueCount) { return 0; }

@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -388,5 +388,10 @@ public void close() {
valueCount = 0;

super.close();
}
}

@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -452,4 +452,9 @@ public void exchange(ValueVector other) {
// TODO: Figure out how to test this scenario, then what to do...
throw new UnsupportedOperationException("Exchange() not yet supported for repeated lists");
}

@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -604,4 +604,9 @@ public void collectLedgers(Set<BufferLedger> ledgers) {
super.collectLedgers(ledgers);
offsets.collectLedgers(ledgers);
}

@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
}

0 comments on commit 6829af0

Please sign in to comment.