diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java index 3d1ef0c48adc5..e73ba39468828 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java @@ -122,6 +122,10 @@ public UnsafeShuffleExternalSorter( this.maxRecordSizeBytes = pageSizeBytes - 4; this.writeMetrics = writeMetrics; initializeForWriting(); + + // preserve first page to ensure that we have at least one page to work with. Otherwise, + // other operators in the same task may starve this sorter (SPARK-9709). + acquireNewPageIfNecessary(pageSizeBytes); } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index 1f2213d0c4346..417ff5278db2a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M // In certain join operations, prepare can be called on the same partition multiple times. // In this case, we need to ensure that each call to compute gets a separate prepare argument. - private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] + private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] /** * Prepare a partition for a single call to compute. diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 94650be536b5f..a266b0c36e0fa 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -530,8 +530,9 @@ public void testPeakMemoryUsed() throws Exception { for (int i = 0; i < numRecordsPerPage * 10; i++) { writer.insertRecordIntoSorter(new Tuple2(1, 1)); newPeakMemory = writer.getPeakMemoryUsedBytes(); - if (i % numRecordsPerPage == 0) { - // We allocated a new page for this record, so peak memory should change + if (i % numRecordsPerPage == 0 && i != 0) { + // The first page is allocated in constructor, another page will be allocated after + // every numRecordsPerPage records (peak memory should change). assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory); } else { assertEquals(previousPeakMemory, newPeakMemory);