Skip to content

Commit

Permalink
DRILL-386 Implement External Sort operator
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenMPhillips committed Feb 26, 2014
1 parent cb9a411 commit 2803d2a
Show file tree
Hide file tree
Showing 44 changed files with 1,999 additions and 60 deletions.
2 changes: 1 addition & 1 deletion distribution/src/resources/runbit
Expand Up @@ -41,4 +41,4 @@ CP=$DRILL_HOME/lib/*:$CP
CP=$CP:$DRILL_CONF_DIR
CP=$HADOOP_CLASSPATH:$CP

exec $JAVA $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.server.Drillbit
exec $JAVA $DRILLBIT_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.server.Drillbit
Expand Up @@ -104,7 +104,11 @@ public TransferPair getTransferPair(){
public TransferPair getTransferPair(FieldReference ref){
return new TransferImpl(getField().clone(ref));
}


public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((${minor.class}Vector) to);
}

public void transferTo(${minor.class}Vector target){
target.data = data;
target.data.retain();
Expand All @@ -118,6 +122,10 @@ private class TransferImpl implements TransferPair{
public TransferImpl(MaterializedField field){
this.to = new ${minor.class}Vector(field, allocator);
}

public TransferImpl(${minor.class}Vector to) {
this.to = to;
}

public ${minor.class}Vector getTo(){
return to;
Expand Down
Expand Up @@ -171,6 +171,10 @@ public TransferPair getTransferPair(FieldReference ref){
return new TransferImpl(getField().clone(ref));
}

public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((Nullable${minor.class}Vector) to);
}


public void transferTo(Nullable${minor.class}Vector target){
bits.transferTo(target.bits);
Expand All @@ -185,7 +189,11 @@ private class TransferImpl implements TransferPair{
public TransferImpl(MaterializedField field){
this.to = new Nullable${minor.class}Vector(field, allocator);
}


public TransferImpl(Nullable${minor.class}Vector to){
this.to = to;
}

public Nullable${minor.class}Vector getTo(){
return to;
}
Expand Down
Expand Up @@ -73,6 +73,10 @@ public TransferPair getTransferPair(){
public TransferPair getTransferPair(FieldReference ref){
return new TransferImpl(getField().clone(ref));
}

public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((Repeated${minor.class}Vector) to);
}

public void transferTo(Repeated${minor.class}Vector target){
offsets.transferTo(target.offsets);
Expand All @@ -88,7 +92,11 @@ private class TransferImpl implements TransferPair{
public TransferImpl(MaterializedField field){
this.to = new Repeated${minor.class}Vector(field, allocator);
}


public TransferImpl(Repeated${minor.class}Vector to){
this.to = to;
}

public Repeated${minor.class}Vector getTo(){
return to;
}
Expand Down
Expand Up @@ -126,6 +126,10 @@ public TransferPair getTransferPair(){
public TransferPair getTransferPair(FieldReference ref){
return new TransferImpl(getField().clone(ref));
}

public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((${minor.class}Vector) to);
}

public void transferTo(${minor.class}Vector target){
this.offsetVector.transferTo(target.offsetVector);
Expand Down Expand Up @@ -169,7 +173,11 @@ private class TransferImpl implements TransferPair{
public TransferImpl(MaterializedField field){
this.to = new ${minor.class}Vector(field, allocator);
}


public TransferImpl(${minor.class}Vector to){
this.to = to;
}

public ${minor.class}Vector getTo(){
return to;
}
Expand Down
Expand Up @@ -53,5 +53,11 @@ public interface ExecConstants {
public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
public static final String EXTERNAL_SORT_TARGET_BATCH_SIZE = "drill.exec.sort.external.batch.size";
public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
public static final String EXTERNAL_SORT_SPILL_GROUP_SIZE = "drill.exec.sort.external.spill.group.size";
public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";

}
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.io.Resources;

import com.typesafe.config.ConfigFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
Expand Down Expand Up @@ -98,7 +99,8 @@ static class Options {
}

public int submitQuery(String planLocation, String type, String zkQuorum, boolean local, int bits) throws Exception {
DrillConfig config = DrillConfig.create();
DrillConfig config = new DrillConfig(ConfigFactory.load("drill-override.conf").
withFallback(ConfigFactory.load("drill-default.conf")).withFallback(ConfigFactory.load("drill-module.conf")));
DrillClient client;
if (local) {
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
Expand All @@ -114,7 +116,7 @@ public int submitQuery(String planLocation, String type, String zkQuorum, boolea
client = new DrillClient(config, clusterCoordinator);
}
client.connect();
QueryResultsListener listener = new QueryResultsListener();
QueryResultsListener listener = new QueryResultsListener(config);
String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
UserProtos.QueryType queryType;
type = type.toLowerCase();
Expand All @@ -140,9 +142,13 @@ public int submitQuery(String planLocation, String type, String zkQuorum, boolea
private class QueryResultsListener implements UserResultsListener {
AtomicInteger count = new AtomicInteger();
private CountDownLatch latch = new CountDownLatch(1);
RecordBatchLoader loader = new RecordBatchLoader(new BootStrapContext(DrillConfig.create()).getAllocator());
RecordBatchLoader loader;
int width;

public QueryResultsListener(DrillConfig config) {
loader = new RecordBatchLoader(new BootStrapContext(config).getAllocator());
}

@Override
public void submissionFailed(RpcException ex) {
System.out.println(String.format("Query failed: %s", ex));
Expand Down
Expand Up @@ -51,6 +51,15 @@ public FunctionImplementationRegistry(DrillConfig config){
}
}
}


private static FunctionImplementationRegistry registry;
public static FunctionImplementationRegistry getRegistry() {
if (registry == null) {
registry = new FunctionImplementationRegistry(DrillConfig.create());
}
return registry;
}

public ArrayListMultimap<String, DrillFuncHolder> getMethods() {
return this.methods;
Expand Down
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.config;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;

import java.util.List;

@JsonTypeName("x-sort")
public class XSort extends Sort {

This comment has been minimized.

Copy link
@tnachen

tnachen Feb 27, 2014

I recommend spelling out external, I couldn't understand what it is if I don't see the context i your commit.

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(XSort.class);

@JsonCreator
public XSort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
super(child, orderings, reverse);
}

public List<OrderDef> getOrderings() {
return orderings;
}

public boolean getReverse() {
return reverse;
}

@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
return physicalVisitor.visitSort(this, value);
}

@Override
public OperatorCost getCost() {
Size childSize = child.getSize();
long n = childSize.getRecordCount();
long width = childSize.getRecordSize();

//TODO: Magic Number, let's assume 1/10 of data can fit in memory.
int k = 10;
long n2 = n/k;
double cpuCost =
k * n2 * (Math.log(n2)/Math.log(2)) + //
n * (Math.log(k)/Math.log(2));
double diskCost = n*width*2;

return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
}

@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new XSort(child, orderings, reverse);
}




}
Expand Up @@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.impl;

import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Stopwatch;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
Expand Down Expand Up @@ -80,7 +82,10 @@ public static RootExec getExec(FragmentContext context, FragmentRoot root) throw
root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
}

Stopwatch watch = new Stopwatch();
watch.start();
root.accept(i, context);
logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
if (i.root == null)
throw new ExecutionSetupException(
"The provided fragment did not have a root node that correctly created a RootExec value.");
Expand Down
Expand Up @@ -50,7 +50,6 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
private int queueSize = 0;
private int batchCount = 0;
private boolean hasSv2;
private long compares = 0;
private BatchSchema schema;

@Override
Expand All @@ -65,6 +64,7 @@ public void init(int limit, FragmentContext context, boolean hasSv2) throws Sche

public void resetQueue(VectorContainer container, SelectionVector4 v4) throws SchemaChangeException {
assert container.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE;
BatchSchema schema = container.getSchema();
VectorContainer newContainer = new VectorContainer();
for (MaterializedField field : schema) {
int id = container.getValueVectorId(new SchemaPath(field.getName(), ExpressionPosition.UNKNOWN)).getFieldId();
Expand Down Expand Up @@ -120,7 +120,6 @@ public void add(FragmentContext context, RecordBatchData batch) throws SchemaCha

@Override
public void generate() throws SchemaChangeException {
logger.debug("Did {} comparisons", compares);
Stopwatch watch = new Stopwatch();
watch.start();
BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
Expand Down Expand Up @@ -168,8 +167,7 @@ private void siftUp() {
private void siftDown() {
int p = 0;
int next;
while (p < queueSize) {
if (p * 2 + 1 >= queueSize) break;
while (p * 2 + 1 < queueSize) {
if (p * 2 + 2 >= queueSize) {
next = p * 2 + 1;
} else {
Expand All @@ -188,18 +186,6 @@ private void siftDown() {
}
}

public boolean validate() {
for (int i = 0; i < queueSize; i++) {
if (!(i * 2 + 2 >= queueSize || compare(i, i * 2 + 2) >= 0)) {
System.out.printf("heap mismatch: %d %d\n", i, i * 2 + 2);
}
if (!(i * 2 + 1 >= queueSize || compare(i, i * 2 + 1) >= 0)) {
System.out.printf("heap mismatch: %d %d\n", i, i * 2 + 1);
}
}
return true;
}

public int pop() {
int value = vector4.get(0);
swap(0, queueSize - 1);
Expand All @@ -217,7 +203,6 @@ public void swap(int sv0, int sv1) {
public int compare(int leftIndex, int rightIndex) {
int sv1 = vector4.get(leftIndex);
int sv2 = vector4.get(rightIndex);
compares++;
return doEval(sv1, sv2);
}

Expand Down
Expand Up @@ -218,7 +218,6 @@ private void purge() throws SchemaChangeException {
VectorContainer newQueue = new VectorContainer();
builder.build(context, newQueue);
priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
// builder.clear();
selectionVector4.clear();
logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS));
}
Expand Down
Expand Up @@ -32,8 +32,8 @@
public class RecordBatchData {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);

final SelectionVector2 sv2;
final int recordCount;
private SelectionVector2 sv2;
private int recordCount;
VectorContainer container = new VectorContainer();

public RecordBatchData(VectorAccessible batch){
Expand Down Expand Up @@ -68,6 +68,12 @@ public List<ValueVector> getVectors() {
return vectors;
}

public void setSv2(SelectionVector2 sv2) {
this.sv2 = sv2;
this.recordCount = sv2.getCount();
this.container.buildSchema(SelectionVectorMode.TWO_BYTE);
}

public SelectionVector2 getSv2() {
return sv2;
}
Expand Down
Expand Up @@ -146,11 +146,10 @@ public IterOutcome next() {
}

builder.build(context, container);

sorter = createNewSorter();
sorter.setup(context, getSelectionVector4(), this.container);
sorter.sort(getSelectionVector4(), this.container);

return IterOutcome.OK_NEW_SCHEMA;

}catch(SchemaChangeException | ClassTransformationException | IOException ex){
Expand Down

1 comment on commit 2803d2a

@tnachen
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to try doing a pull request?

Please sign in to comment.