Skip to content

Commit

Permalink
DRILL-6335: Column accessor refactoring
Browse files Browse the repository at this point in the history
closes #1218
  • Loading branch information
Paul Rogers authored and vdiravka committed Apr 29, 2018
1 parent 883c8d9 commit dbff164
Show file tree
Hide file tree
Showing 44 changed files with 1,505 additions and 915 deletions.
Expand Up @@ -28,8 +28,9 @@
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
import org.apache.drill.exec.vector.accessor.writer.MapWriter;
import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
import org.apache.drill.exec.vector.complex.MapVector;

/**
* Represents the write-time state for a column including the writer and the (optional)
Expand Down Expand Up @@ -88,10 +89,10 @@ public void close() {
public static class MapColumnState extends BaseMapColumnState {

public MapColumnState(ResultSetLoaderImpl resultSetLoader,
ColumnMetadata columnSchema,
ColumnMetadata columnSchema, MapVector mapVector,
ProjectionSet projectionSet) {
super(resultSetLoader,
ColumnWriterFactory.buildMap(columnSchema, null,
MapWriter.buildMap(columnSchema, mapVector,
new ArrayList<AbstractObjectWriter>()),
new NullVectorState(),
projectionSet);
Expand All @@ -115,7 +116,6 @@ public MapArrayColumnState(ResultSetLoaderImpl resultSetLoader,
projectionSet);
}

@SuppressWarnings("resource")
public static MapArrayColumnState build(ResultSetLoaderImpl resultSetLoader,
ColumnMetadata columnSchema,
ProjectionSet projectionSet) {
Expand All @@ -128,7 +128,7 @@ public static MapArrayColumnState build(ResultSetLoaderImpl resultSetLoader,

// Create the writer using the offset vector

AbstractObjectWriter writer = ColumnWriterFactory.buildMapArray(
AbstractObjectWriter writer = MapWriter.buildMapArray(
columnSchema, offsetVector,
new ArrayList<AbstractObjectWriter>());

Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
Expand All @@ -38,7 +39,13 @@ public PrimitiveColumnState(ResultSetLoaderImpl resultSetLoader,
AbstractObjectWriter colWriter,
VectorState vectorState) {
super(resultSetLoader, colWriter, vectorState);
writer.bindListener(this);
ScalarWriter scalarWriter;
if (colWriter.type() == ObjectType.ARRAY) {
scalarWriter = writer.array().scalar();
} else {
scalarWriter = writer.scalar();
}
((AbstractScalarWriter) scalarWriter).bindListener(this);
}

public static PrimitiveColumnState newPrimitive(
Expand Down
Expand Up @@ -56,7 +56,8 @@ public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vect
// Create the offsets state with the offset vector portion of the repeated
// vector, and the offset writer portion of the array writer.

offsetsState = new OffsetVectorState(arrayWriter.offsetWriter(),
offsetsState = new OffsetVectorState(
arrayWriter.offsetWriter(),
vector.getOffsetVector(),
(AbstractObjectWriter) arrayWriter.entry());
}
Expand Down
Expand Up @@ -401,7 +401,7 @@ public RowSetLoader writer() {
@Override
public ResultSetLoader setRow(Object... values) {
startRow();
writer().setTuple(values);
writer().setObject(values);
saveRow();
return this;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
Expand Down Expand Up @@ -95,4 +96,12 @@ public void endBatch() {

@Override
public int rowCount() { return rsLoader.rowCount(); }

@Override
public ColumnMetadata schema() {
// The top-level tuple (the data row) is not associated
// with a parent column. By contrast, a map tuple is
// associated with the column that defines the map.
return null;
}
}
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.accessor.WriterPosition;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
Expand Down Expand Up @@ -98,7 +99,7 @@ public static class OffsetVectorState extends SingleVectorState {

private final AbstractObjectWriter childWriter;

public OffsetVectorState(AbstractScalarWriter writer, ValueVector mainVector,
public OffsetVectorState(WriterPosition writer, ValueVector mainVector,
AbstractObjectWriter childWriter) {
super(writer, mainVector);
this.childWriter = childWriter;
Expand Down Expand Up @@ -145,7 +146,7 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {

UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
int offset = childWriter.events().writerIndex().rowStartIndex();
int offset = childWriter.rowStartIndex();
int newIndex = 1;
ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} to {} with offset {}",
Math.max(0, sourceEndIndex - sourceStartIndex + 1),
Expand All @@ -163,11 +164,11 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
}
}

protected final AbstractScalarWriter writer;
protected final WriterPosition writer;
protected final ValueVector mainVector;
protected ValueVector backupVector;

public SingleVectorState(AbstractScalarWriter writer, ValueVector mainVector) {
public SingleVectorState(WriterPosition writer, ValueVector mainVector) {
this.writer = writer;
this.mainVector = mainVector;
}
Expand Down Expand Up @@ -198,7 +199,7 @@ public int allocate(int cardinality) {
@Override
public void rollover(int cardinality) {

int sourceStartIndex = writer.writerIndex().rowStartIndex();
int sourceStartIndex = writer.rowStartIndex();

// Remember the last write index for the original vector.
// This tells us the end of the set of values to move, while the
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.exec.record.metadata.AbstractColumnMetadata;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.ProjectionType;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.vector.ValueVector;
Expand All @@ -38,6 +39,7 @@
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
import org.apache.drill.exec.vector.complex.MapVector;

/**
* Represents the loader state for a tuple: a row or a map. This is "state" in
Expand Down Expand Up @@ -103,7 +105,11 @@ public MapState(ResultSetLoaderImpl rsLoader,
ProjectionSet projectionSet) {
super(rsLoader, projectionSet);
this.mapColumnState = mapColumnState;
mapColumnState.writer().bindListener(this);
if (mapColumnState.schema().isArray()) {
mapColumnState.writer().array().tuple().bindListener(this);
} else {
mapColumnState.writer().tuple().bindListener(this);
}
}

/**
Expand Down Expand Up @@ -177,7 +183,7 @@ protected TupleState(ResultSetLoaderImpl rsLoader, ProjectionSet projectionSet)

public List<ColumnState> columns() { return columns; }

public TupleMetadata schema() { return writer().schema(); }
public TupleMetadata schema() { return writer().tupleSchema(); }

public abstract AbstractTupleWriter writer();

Expand All @@ -200,6 +206,13 @@ public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSche
return addColumn(columnSchema);
}

@Override
public ProjectionType projectionType(String columnName) {
return projectionSet.isProjected(columnName) ?
ProjectionType.TUPLE :
ProjectionType.UNPROJECTED;
}

/**
* Implementation of the work to add a new column to this tuple given a
* schema description of the column.
Expand Down Expand Up @@ -291,8 +304,14 @@ private ColumnState buildMap(ColumnMetadata columnSchema) {
columnSchema,
childProjection);
} else {
MapVector vector;
if (columnSchema.isProjected()) {
vector = new MapVector(columnSchema.schema(), resultSetLoader.allocator(), null);
} else {
vector = null;
}
return new MapColumnState(resultSetLoader,
columnSchema,
columnSchema, vector,
childProjection);
}
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
import org.apache.drill.exec.vector.accessor.writer.MapWriter;
import org.apache.drill.exec.vector.complex.AbstractMapVector;

/**
Expand All @@ -50,7 +51,7 @@ protected List<AbstractObjectWriter> buildContainerChildren(VectorContainer cont
private AbstractObjectWriter buildVectorWriter(ValueVector vector, VectorDescrip descrip) {
MajorType type = vector.getField().getType();
if (type.getMinorType() == MinorType.MAP) {
return ColumnWriterFactory.buildMapWriter(descrip.metadata,
return MapWriter.buildMapWriter(descrip.metadata,
(AbstractMapVector) vector,
buildMap((AbstractMapVector) vector, descrip));
} else {
Expand Down
Expand Up @@ -59,7 +59,6 @@

public class TestResultSetLoaderMapArray extends SubOperatorTest {

@SuppressWarnings("resource")
@Test
public void testBasics() {
TupleMetadata schema = new SchemaBuilder()
Expand All @@ -77,7 +76,7 @@ public void testBasics() {

// Verify structure and schema

TupleMetadata actualSchema = rootWriter.schema();
TupleMetadata actualSchema = rootWriter.tupleSchema();
assertEquals(2, actualSchema.size());
assertTrue(actualSchema.metadata(1).isArray());
assertTrue(actualSchema.metadata(1).isMap());
Expand Down

0 comments on commit dbff164

Please sign in to comment.