Skip to content

Commit

Permalink
DRILL-3920: Additional tests added to TestValueVectors for serializat…
Browse files Browse the repository at this point in the history
…ion and loading.

Some light cleanup of a few vector implementations.

closes #194
  • Loading branch information
cwestin authored and Hanifi Gunes committed Oct 13, 2015
1 parent b4d47c5 commit 2736412
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 130 deletions.
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.drill.exec.vector;

import io.netty.buffer.DrillBuf;

import java.util.Iterator;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;

import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
Expand Down Expand Up @@ -91,16 +94,25 @@ public abstract static class BaseMutator implements ValueVector.Mutator {
protected BaseMutator() { }

@Override
public void generateTestData(int values) { }
public void generateTestData(int values) {}

//TODO: consider making mutator stateless(if possible) on another issue.
public void reset() { }
public void reset() {}
}

@Override
public Iterator<ValueVector> iterator() {
return Iterators.emptyIterator();
}

public static boolean checkBufRefs(final ValueVector vv) {
for(final DrillBuf buffer : vv.getBuffers(false)) {
if (buffer.refCnt() <= 0) {
throw new IllegalStateException("zero refcount");
}
}

return true;
}
}

Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.vector;

import io.netty.buffer.DrillBuf;

import java.io.Closeable;

import io.netty.buffer.DrillBuf;

import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
Expand Down Expand Up @@ -80,8 +80,9 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
int getValueCapacity();

/**
* Alternative to clear(). Allows use as closeable in try-with-resources.
* Alternative to clear(). Allows use as an AutoCloseable in try-with-resources.
*/
@Override
void close();

/**
Expand Down Expand Up @@ -155,9 +156,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
* Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for
* this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus
* external classes shouldn't hold a reference to it (unless they change it).
*
* @param clear
* Whether to clear vector
* @param clear Whether to clear vector before returning; the buffers will still be refcounted;
* but the returned array will be the only reference to them
*
* @return The underlying {@link io.netty.buffer.DrillBuf buffers} that is used by this vector instance.
*/
Expand Down
Expand Up @@ -36,6 +36,7 @@
* Base class for MapVectors. Currently used by RepeatedMapVector and MapVector
*/
public abstract class AbstractMapVector extends AbstractContainerVector {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);

// Maintains a map with key as field name and value is the vector itself
private final MapWithOrdinal<String, ValueVector> vectors = new MapWithOrdinal<>();
Expand All @@ -46,13 +47,23 @@ protected AbstractMapVector(MaterializedField field, BufferAllocator allocator,
// create the hierarchy of the child vectors based on the materialized field
for (MaterializedField child : clonedField.getChildren()) {
if (!child.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) {
String fieldName = child.getLastName();
ValueVector v = TypeHelper.getNewVector(child, allocator, callBack);
final String fieldName = child.getLastName();
final ValueVector v = TypeHelper.getNewVector(child, allocator, callBack);
putVector(fieldName, v);
}
}
}

@Override
public void close() {
for(final ValueVector valueVector : vectors.values()) {
valueVector.close();
}
vectors.clear();

super.close();
}

@Override
public boolean allocateNewSafe() {
/* boolean to keep track if all the memory allocation were successful
Expand All @@ -62,8 +73,7 @@ public boolean allocateNewSafe() {
*/
boolean success = false;
try {

for (ValueVector v : vectors.values()) {
for (final ValueVector v : vectors.values()) {
if (!v.allocateNewSafe()) {
return false;
}
Expand Down Expand Up @@ -105,13 +115,14 @@ public boolean allocateNewSafe() {
*
* @return resultant {@link org.apache.drill.exec.vector.ValueVector}
*/
@Override
public <T extends ValueVector> T addOrGet(String name, TypeProtos.MajorType type, Class<T> clazz) {
final ValueVector existing = getChild(name);
boolean create = false;
if (existing == null) {
create = true;
} else if (clazz.isAssignableFrom(existing.getClass())) {
return (T)existing;
return (T) existing;
} else if (nullFilled(existing)) {
existing.clear();
create = true;
Expand All @@ -129,7 +140,7 @@ public <T extends ValueVector> T addOrGet(String name, TypeProtos.MajorType type
}

private boolean nullFilled(ValueVector vector) {
for (int r=0; r<vector.getAccessor().getValueCount(); r++) {
for (int r = 0; r < vector.getAccessor().getValueCount(); r++) {
if (!vector.getAccessor().isNull(r)) {
return false;
}
Expand All @@ -148,8 +159,9 @@ public ValueVector getChildByOrdinal(int id) {
* Returns a {@link org.apache.drill.exec.vector.ValueVector} instance of subtype of <T> corresponding to the given
* field name if exists or null.
*/
@Override
public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
ValueVector v = vectors.get(name.toLowerCase());
final ValueVector v = vectors.get(name.toLowerCase());
if (v == null) {
return null;
}
Expand All @@ -172,7 +184,7 @@ protected void putChild(String name, ValueVector vector) {
* @param vector vector to be inserted
*/
protected void putVector(String name, ValueVector vector) {
ValueVector old = vectors.put(
final ValueVector old = vectors.put(
Preconditions.checkNotNull(name, "field name cannot be null").toLowerCase(),
Preconditions.checkNotNull(vector, "vector cannot be null")
);
Expand All @@ -192,6 +204,7 @@ protected Collection<ValueVector> getChildren() {
/**
* Returns the number of underlying child vectors.
*/
@Override
public int size() {
return vectors.size();
}
Expand All @@ -205,8 +218,8 @@ public Iterator<ValueVector> iterator() {
* Returns a list of scalar child vectors recursing the entire vector hierarchy.
*/
public List<ValueVector> getPrimitiveVectors() {
List<ValueVector> primitiveVectors = Lists.newArrayList();
for (ValueVector v : vectors.values()) {
final List<ValueVector> primitiveVectors = Lists.newArrayList();
for (final ValueVector v : vectors.values()) {
if (v instanceof AbstractMapVector) {
AbstractMapVector mapVector = (AbstractMapVector) v;
primitiveVectors.addAll(mapVector.getPrimitiveVectors());
Expand All @@ -220,6 +233,7 @@ public List<ValueVector> getPrimitiveVectors() {
/**
* Returns a vector with its corresponding ordinal mapping if field exists or null.
*/
@Override
public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
final int ordinal = vectors.getOrdinal(name.toLowerCase());
if (ordinal < 0) {
Expand All @@ -231,13 +245,13 @@ public VectorWithOrdinal getChildVectorWithOrdinal(String name) {

@Override
public DrillBuf[] getBuffers(boolean clear) {
List<DrillBuf> buffers = Lists.newArrayList();
final List<DrillBuf> buffers = Lists.newArrayList();

for (ValueVector vector : vectors.values()) {
for (DrillBuf buf : vector.getBuffers(false)) {
for (final ValueVector vector : vectors.values()) {
for (final DrillBuf buf : vector.getBuffers(false)) {
buffers.add(buf);
if (clear) {
buf.retain();
buf.retain(1);
}
}
if (clear) {
Expand All @@ -252,20 +266,11 @@ public DrillBuf[] getBuffers(boolean clear) {
public int getBufferSize() {
int actualBufSize = 0 ;

for (ValueVector v : vectors.values()) {
for (DrillBuf buf : v.getBuffers(false)) {
for (final ValueVector v : vectors.values()) {
for (final DrillBuf buf : v.getBuffers(false)) {
actualBufSize += buf.writerIndex();
}
}
return actualBufSize;
}

@Override
public void close() {
for(final ValueVector valueVector : vectors.values()) {
valueVector.close();
}

super.close();
}
}
Expand Up @@ -22,6 +22,7 @@

import io.netty.buffer.DrillBuf;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -50,7 +51,7 @@
import com.google.common.base.Preconditions;

public class MapVector extends AbstractMapVector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);

public final static MajorType TYPE = Types.required(MinorType.MAP);

Expand Down Expand Up @@ -112,7 +113,7 @@ public int getBufferSize() {
return 0;
}
long buffer = 0;
for (ValueVector v : (Iterable<ValueVector>)this) {
for (final ValueVector v : (Iterable<ValueVector>)this) {
buffer += v.getBufferSize();
}

Expand Down Expand Up @@ -149,7 +150,7 @@ public TransferPair getTransferPair() {

@Override
public TransferPair makeTransferPair(ValueVector to) {
return new MapTransferPair(this, (MapVector)to);
return new MapTransferPair(this, (MapVector) to);
}

@Override
Expand Down Expand Up @@ -194,18 +195,17 @@ protected MapTransferPair(MapVector from, MapVector to, boolean allocate) {
// (This is similar to what happens in ScanBatch where the children cannot be added till they are
// read). To take care of this, we ensure that the hashCode of the MaterializedField does not
// include the hashCode of the children but is based only on MaterializedField$key.
ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (allocate && to.size() != preSize) {
newVector.allocateNew();
}
pairs[i++] = vector.makeTransferPair(newVector);
}
}


@Override
public void transfer() {
for (TransferPair p : pairs) {
for (final TransferPair p : pairs) {
p.transfer();
}
to.valueCount = from.valueCount;
Expand All @@ -231,7 +231,6 @@ public void splitAndTransfer(int startIndex, int length) {
}
to.getMutator().setValueCount(length);
}

}

@Override
Expand Down Expand Up @@ -329,7 +328,6 @@ public void get(int index, ComplexHolder holder) {
public int getValueCount() {
return valueCount;
}

}

public ValueVector getVectorById(int id) {
Expand All @@ -340,7 +338,7 @@ public class Mutator extends BaseValueVector.BaseMutator {

@Override
public void setValueCount(int valueCount) {
for (ValueVector v : getChildren()) {
for (final ValueVector v : getChildren()) {
v.getMutator().setValueCount(valueCount);
}
MapVector.this.valueCount = valueCount;
Expand All @@ -355,17 +353,19 @@ public void generateTestData(int values) { }

@Override
public void clear() {
valueCount = 0;
for (ValueVector v : getChildren()) {
for (final ValueVector v : getChildren()) {
v.clear();
}
valueCount = 0;
}

@Override
public void close() {
for (final ValueVector v : getChildren()) {
final Collection<ValueVector> vectors = getChildren();
for (final ValueVector v : vectors) {
v.close();
}
vectors.clear();
valueCount = 0;

super.close();
Expand Down

0 comments on commit 2736412

Please sign in to comment.