Skip to content

Commit

Permalink
DRILL-2730: Use different paths for ExternalSort spills
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenMPhillips committed Apr 15, 2015
1 parent 96943de commit 959419d
Showing 1 changed file with 25 additions and 4 deletions.
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.xsort;

import com.google.common.base.Joiner;
import io.netty.buffer.DrillBuf;

import java.io.IOException;
Expand Down Expand Up @@ -47,6 +48,8 @@
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
Expand All @@ -58,7 +61,6 @@
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
Expand Down Expand Up @@ -100,11 +102,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private FileSystem fs;
private int spillCount = 0;
private int batchesSinceLastSpill = 0;
private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
private boolean first = true;
private long totalSizeInMemory = 0;
private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
private final String fileName;

public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true);
Expand All @@ -121,9 +123,11 @@ public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, Record
SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
uid = System.nanoTime();
copierAllocator = oContext.getAllocator().getChildAllocator(
context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
FragmentHandle handle = context.getHandle();
fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
}

@Override
Expand Down Expand Up @@ -423,7 +427,7 @@ public void mergeAndSpill() throws SchemaChangeException {
c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
c1.setRecordCount(count);

String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++));
String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());

try {
Expand Down Expand Up @@ -648,4 +652,21 @@ protected void killIncoming(boolean sendUpstream) {
incoming.kill(sendUpstream);
}

private String getFileName(int spill) {
/*
* From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
* which we will dump the incoming buffer data
*/
FragmentHandle handle = context.getHandle();

String qid = QueryIdHelper.getQueryId(handle.getQueryId());

int majorFragmentId = handle.getMajorFragmentId();
int minorFragmentId = handle.getMinorFragmentId();

String fileName = String.format("%s//%s//major_fragment_%s//minor_fragment_%s//operator_%s//%s", dirs.next(), qid, majorFragmentId, minorFragmentId, popConfig.getOperatorId(), spill);

return fileName;
}

}

0 comments on commit 959419d

Please sign in to comment.