diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index a23780e3210..dbfd1a511f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl.xsort; +import com.google.common.base.Joiner; import io.netty.buffer.DrillBuf; import java.io.IOException; @@ -47,6 +48,8 @@ import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -58,7 +61,6 @@ import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -100,11 +102,11 @@ public class ExternalSortBatch extends AbstractRecordBatch { private FileSystem fs; private int spillCount = 0; private int batchesSinceLastSpill = 0; - private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files private boolean first = true; private long totalSizeInMemory = 0; private long highWaterMark = Long.MAX_VALUE; private int targetRecordCount; + private final String fileName; public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, true); @@ -121,9 +123,11 @@ public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, Record SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD); SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES)); - uid = System.nanoTime(); copierAllocator = oContext.getAllocator().getChildAllocator( context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true); + FragmentHandle handle = context.getHandle(); + fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()), + handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId()); } @Override @@ -423,7 +427,7 @@ public void mergeAndSpill() throws SchemaChangeException { c1.buildSchema(BatchSchema.SelectionVectorMode.NONE); c1.setRecordCount(count); - String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++)); + String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++); BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator()); try { @@ -648,4 +652,21 @@ protected void killIncoming(boolean sendUpstream) { incoming.kill(sendUpstream); } + private String getFileName(int spill) { + /* + * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to + * which we will dump the incoming buffer data + */ + FragmentHandle handle = context.getHandle(); + + String qid = QueryIdHelper.getQueryId(handle.getQueryId()); + + int majorFragmentId = handle.getMajorFragmentId(); + int minorFragmentId = handle.getMinorFragmentId(); + + String fileName = String.format("%s//%s//major_fragment_%s//minor_fragment_%s//operator_%s//%s", dirs.next(), qid, majorFragmentId, minorFragmentId, popConfig.getOperatorId(), spill); + + return fileName; + } + }