Skip to content

Commit

Permalink
DRILL-4182 TopN schema changes support.
Browse files Browse the repository at this point in the history
  • Loading branch information
amithadke authored and StevenMPhillips committed Dec 14, 2015
1 parent cc9175c commit e529df4
Show file tree
Hide file tree
Showing 8 changed files with 384 additions and 53 deletions.
Expand Up @@ -70,12 +70,17 @@ public void resetQueue(VectorContainer container, SelectionVector4 v4) throws Sc
newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors()); newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors());
} }
newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
// Cleanup before recreating hyperbatch and sv4.
cleanup();
hyperBatch = new ExpandableHyperContainer(newContainer); hyperBatch = new ExpandableHyperContainer(newContainer);
batchCount = hyperBatch.iterator().next().getValueVectors().length; batchCount = hyperBatch.iterator().next().getValueVectors().length;
final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1)); final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE); heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
// Reset queue size (most likely to be set to limit).
queueSize = 0;
for (int i = 0; i < v4.getTotalCount(); i++) { for (int i = 0; i < v4.getTotalCount(); i++) {
heapSv4.set(i, v4.get(i)); heapSv4.set(i, v4.get(i));
++queueSize;
} }
v4.clear(); v4.clear();
doSetup(context, hyperBatch, null); doSetup(context, hyperBatch, null);
Expand Down Expand Up @@ -146,8 +151,15 @@ public SelectionVector4 getFinalSv4() {


@Override @Override
public void cleanup() { public void cleanup() {
heapSv4.clear(); if (heapSv4 != null) {
hyperBatch.clear(); heapSv4.clear();
}
if (hyperBatch != null) {
hyperBatch.clear();
}
if (finalSv4 != null) {
finalSv4.clear();
}
} }


private void siftUp() { private void siftUp() {
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {


private final RecordBatch incoming; private final RecordBatch incoming;
private BatchSchema schema; private BatchSchema schema;
private boolean schemaChanged = false;
private PriorityQueue priorityQueue; private PriorityQueue priorityQueue;
private TopN config; private TopN config;
SelectionVector4 sv4; SelectionVector4 sv4;
Expand Down Expand Up @@ -143,6 +145,7 @@ public void buildSchema() throws SchemaChangeException {
} }
container.buildSchema(SelectionVectorMode.NONE); container.buildSchema(SelectionVectorMode.NONE);
container.setRecordCount(0); container.setRecordCount(0);

return; return;
case STOP: case STOP:
state = BatchState.STOP; state = BatchState.STOP;
Expand Down Expand Up @@ -202,9 +205,16 @@ public IterOutcome innerNext() {
// only change in the case that the schema truly changes. Artificial schema changes are ignored. // only change in the case that the schema truly changes. Artificial schema changes are ignored.
if (!incoming.getSchema().equals(schema)) { if (!incoming.getSchema().equals(schema)) {
if (schema != null) { if (schema != null) {
throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); if (!unionTypeEnabled) {
throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
} else {
this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
purgeAndResetPriorityQueue();
this.schemaChanged = true;
}
} else {
this.schema = incoming.getSchema();
} }
this.schema = incoming.getSchema();
} }
// fall through. // fall through.
case OK: case OK:
Expand All @@ -216,11 +226,17 @@ public IterOutcome innerNext() {
} }
countSincePurge += incoming.getRecordCount(); countSincePurge += incoming.getRecordCount();
batchCount++; batchCount++;
RecordBatchData batch = new RecordBatchData(incoming); RecordBatchData batch;
if (schemaChanged) {
batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext));
} else {
batch = new RecordBatchData(incoming);
}
boolean success = false; boolean success = false;
try { try {
batch.canonicalize(); batch.canonicalize();
if (priorityQueue == null) { if (priorityQueue == null) {
assert !schemaChanged;
priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
} }
priorityQueue.add(context, batch); priorityQueue.add(context, batch);
Expand Down Expand Up @@ -255,7 +271,6 @@ public IterOutcome innerNext() {
container.add(w.getValueVectors()); container.add(w.getValueVectors());
} }
container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);

recordCount = sv4.getCount(); recordCount = sv4.getCount();
return IterOutcome.OK_NEW_SCHEMA; return IterOutcome.OK_NEW_SCHEMA;


Expand Down Expand Up @@ -323,7 +338,7 @@ public PriorityQueue createNewPriorityQueue(FragmentContext context, List<Orderi
for (Ordering od : orderings) { for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison. // first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl(); ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry(), unionTypeEnabled);
if (collector.hasErrors()) { if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
} }
Expand Down Expand Up @@ -356,6 +371,56 @@ public PriorityQueue createNewPriorityQueue(FragmentContext context, List<Orderi
return q; return q;
} }


/**
* Handle schema changes during execution.
* 1. Purge existing batches
* 2. Promote newly created container for new schema.
* 3. Recreate priority queue and reset with coerced container.
* @throws SchemaChangeException
*/
public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTransformationException, IOException {
final Stopwatch watch = new Stopwatch();
watch.start();
final VectorContainer c = priorityQueue.getHyperBatch();
final VectorContainer newContainer = new VectorContainer(oContext);
final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
try {
do {
final int count = selectionVector4.getCount();
final int copiedRecords = copier.copyRecords(0, count);
assert copiedRecords == count;
for (VectorWrapper<?> v : newContainer) {
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(count);
}
newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
newContainer.setRecordCount(count);
builder.add(newBatch);
} while (selectionVector4.next());
selectionVector4.clear();
c.clear();
final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
builder.canonicalize();
builder.build(context, oldSchemaContainer);
oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
// Canonicalize new container since we canonicalize incoming batches before adding to queue.
final VectorContainer canonicalizedContainer = VectorContainer.canonicalize(newSchemaContainer);
canonicalizedContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
priorityQueue.cleanup();
priorityQueue = createNewPriorityQueue(context, config.getOrderings(), canonicalizedContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
priorityQueue.resetQueue(canonicalizedContainer, builder.getSv4().createNewWrapperCurrent());
} finally {
builder.clear();
builder.close();
}
logger.debug("Took {} us to purge and recreate queue for new schema", watch.elapsed(TimeUnit.MICROSECONDS));
}

@Override @Override
public WritableBatch getWritableBatch() { public WritableBatch getWritableBatch() {
throw new UnsupportedOperationException("A sort batch is not writable."); throw new UnsupportedOperationException("A sort batch is not writable.");
Expand Down
Expand Up @@ -62,7 +62,7 @@ public int getRecordCount() {


@Override @Override
protected boolean setupNewSchema() throws SchemaChangeException { protected boolean setupNewSchema() throws SchemaChangeException {
container.zeroVectors(); container.clear();
switch(incoming.getSchema().getSelectionVectorMode()){ switch(incoming.getSchema().getSelectionVectorMode()){
case NONE: case NONE:
this.copier = getStraightCopier(); this.copier = getStraightCopier();
Expand Down
Expand Up @@ -132,7 +132,8 @@ public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedF
} }


public void addVector(ValueVector v) { public void addVector(ValueVector v) {
Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s", v.getClass(), this.getVectorClass())); Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s for field %s",
v.getClass(), this.getVectorClass(), v.getField()));
vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time
} }


Expand Down
Expand Up @@ -96,6 +96,49 @@ public static BatchSchema mergeSchemas(BatchSchema... schemas) {
return s; return s;
} }


private static ValueVector coerceVector(ValueVector v, VectorContainer c, MaterializedField field,
int recordCount, OperatorContext context) {
if (v != null) {
int valueCount = v.getAccessor().getValueCount();
TransferPair tp = v.getTransferPair();
tp.transfer();
if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
if (field.getType().getMinorType() == MinorType.UNION) {
UnionVector u = (UnionVector) tp.getTo();
for (MinorType t : field.getType().getSubTypeList()) {
if (u.getField().getType().getSubTypeList().contains(t)) {
continue;
}
u.addSubType(t);
}
}
return tp.getTo();
} else {
ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
UnionVector u = (UnionVector) newVector;
u.addVector(tp.getTo());
MinorType type = v.getField().getType().getMinorType();
for (int i = 0; i < valueCount; i++) {
u.getMutator().setType(i, type);
}
for (MinorType t : field.getType().getSubTypeList()) {
if (u.getField().getType().getSubTypeList().contains(t)) {
continue;
}
u.addSubType(t);
}
u.getMutator().setValueCount(valueCount);
return u;
}
} else {
v = TypeHelper.getNewVector(field, context.getAllocator());
v.allocateNew();
v.getMutator().setValueCount(recordCount);
return v;
}
}

/** /**
* Creates a copy a record batch, converting any fields as necessary to coerce it into the provided schema * Creates a copy a record batch, converting any fields as necessary to coerce it into the provided schema
* @param in * @param in
Expand All @@ -105,54 +148,39 @@ public static BatchSchema mergeSchemas(BatchSchema... schemas) {
*/ */
public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, OperatorContext context) { public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, OperatorContext context) {
int recordCount = in.getRecordCount(); int recordCount = in.getRecordCount();
Map<SchemaPath,ValueVector> vectorMap = Maps.newHashMap(); boolean isHyper = false;
Map<SchemaPath, Object> vectorMap = Maps.newHashMap();
for (VectorWrapper w : in) { for (VectorWrapper w : in) {
ValueVector v = w.getValueVector(); if (w.isHyper()) {
vectorMap.put(v.getField().getPath(), v); isHyper = true;
final ValueVector[] vvs = w.getValueVectors();
vectorMap.put(vvs[0].getField().getPath(), vvs);
} else {
assert !isHyper;
final ValueVector v = w.getValueVector();
vectorMap.put(v.getField().getPath(), v);
}
} }


VectorContainer c = new VectorContainer(context); VectorContainer c = new VectorContainer(context);


for (MaterializedField field : toSchema) { for (MaterializedField field : toSchema) {
ValueVector v = vectorMap.remove(field.getPath()); if (isHyper) {
if (v != null) { final ValueVector[] vvs = (ValueVector[]) vectorMap.remove(field.getPath());
int valueCount = v.getAccessor().getValueCount(); final ValueVector[] vvsOut;
TransferPair tp = v.getTransferPair(); if (vvs == null) {
tp.transfer(); vvsOut = new ValueVector[1];
if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) { vvsOut[0] = coerceVector(null, c, field, recordCount, context);
if (field.getType().getMinorType() == MinorType.UNION) {
UnionVector u = (UnionVector) tp.getTo();
for (MinorType t : field.getType().getSubTypeList()) {
if (u.getField().getType().getSubTypeList().contains(t)) {
continue;
}
u.addSubType(t);
}
}
c.add(tp.getTo());
} else { } else {
ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator()); vvsOut = new ValueVector[vvs.length];
Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector"); for (int i = 0; i < vvs.length; ++i) {
UnionVector u = (UnionVector) newVector; vvsOut[i] = coerceVector(vvs[i], c, field, recordCount, context);
u.addVector(tp.getTo());
MinorType type = v.getField().getType().getMinorType();
for (int i = 0; i < valueCount; i++) {
u.getMutator().setType(i, type);
}
for (MinorType t : field.getType().getSubTypeList()) {
if (u.getField().getType().getSubTypeList().contains(t)) {
continue;
}
u.addSubType(t);
} }
u.getMutator().setValueCount(valueCount);
c.add(u);
} }
c.add(vvsOut);
} else { } else {
v = TypeHelper.getNewVector(field, context.getAllocator()); final ValueVector v = (ValueVector) vectorMap.remove(field.getPath());
v.allocateNew(); c.add(coerceVector(v, c, field, recordCount, context));
v.getMutator().setValueCount(recordCount);
c.add(v);
} }
} }
c.buildSchema(in.getSchema().getSelectionVectorMode()); c.buildSchema(in.getSchema().getSelectionVectorMode());
Expand Down
Expand Up @@ -187,7 +187,11 @@ public int compare(VectorWrapper<?> v1, VectorWrapper<?> v2) {
}); });


for (VectorWrapper<?> w : canonicalWrappers) { for (VectorWrapper<?> w : canonicalWrappers) {
vc.add(w.getValueVector()); if (w.isHyper()) {
vc.add(w.getValueVectors());
} else {
vc.add(w.getValueVector());
}
} }
vc.oContext = original.oContext; vc.oContext = original.oContext;
return vc; return vc;
Expand Down
Expand Up @@ -42,15 +42,25 @@ public static void printHyperBatch(VectorAccessible batch, SelectionVector4 sv4)
} }
int width = columns.size(); int width = columns.size();
for (int j = 0; j < sv4.getCount(); j++) { for (int j = 0; j < sv4.getCount(); j++) {
if (j%50 == 0) {
System.out.println(StringUtils.repeat("-", width * 17 + 1));
for (String column : columns) {
System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14));
}
System.out.printf("|\n");
System.out.println(StringUtils.repeat("-", width*17 + 1));
}
for (VectorWrapper vw : batch) { for (VectorWrapper vw : batch) {
Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535); Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
if (o instanceof byte[]) { String value;
String value = new String((byte[]) o); if (o == null) {
System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); value = "null";
} else if (o instanceof byte[]) {
value = new String((byte[]) o);
} else { } else {
String value = o.toString(); value = o.toString();
System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
} }
System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
} }
System.out.printf("|\n"); System.out.printf("|\n");
} }
Expand Down

0 comments on commit e529df4

Please sign in to comment.