Skip to content

Commit

Permalink
[SPARK-10379] preserve first page in UnsafeShuffleExternalSorter
Browse files Browse the repository at this point in the history
Author: Davies Liu <davies@databricks.com>

Closes apache#8543 from davies/preserve_page.

(cherry picked from commit 62b4690)
Signed-off-by: Andrew Or <andrew@databricks.com>
(cherry picked from commit b846a9d)
  • Loading branch information
Davies Liu authored and Marcelo Vanzin committed Sep 29, 2015
1 parent 07c092a commit aa75b4c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public UnsafeShuffleExternalSorter(
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();

// preserve first page to ensure that we have at least one page to work with. Otherwise,
// other operators in the same task may starve this sorter (SPARK-9709).
acquireNewPageIfNecessary(pageSizeBytes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M

// In certain join operations, prepare can be called on the same partition multiple times.
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]

/**
* Prepare a partition for a single call to compute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,9 @@ public void testPeakMemoryUsed() throws Exception {
for (int i = 0; i < numRecordsPerPage * 10; i++) {
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
newPeakMemory = writer.getPeakMemoryUsedBytes();
if (i % numRecordsPerPage == 0) {
// We allocated a new page for this record, so peak memory should change
if (i % numRecordsPerPage == 0 && i != 0) {
// The first page is allocated in constructor, another page will be allocated after
// every numRecordsPerPage records (peak memory should change).
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
assertEquals(previousPeakMemory, newPeakMemory);
Expand Down

0 comments on commit aa75b4c

Please sign in to comment.