Skip to content

Commit

Permalink
[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init la…
Browse files Browse the repository at this point in the history
…zily

## What changes were proposed in this pull request?

Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.

We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.

## How was this patch tested?

Exist tests

Author: zhoukang <zhoukang199191@gmail.com>

Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator.

(cherry picked from commit 45b4bbf)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
caneGuy authored and cloud-fan committed Jan 25, 2018
1 parent a857ad5 commit 0126952
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C](

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
private var deserializeStream: DeserializationStream = null
private var nextItem: (K, C) = null
private var objectsRead = 0

Expand Down Expand Up @@ -528,27 +528,30 @@ class ExternalAppendOnlyMap[K, V, C](
override def hasNext: Boolean = {
if (nextItem == null) {
if (deserializeStream == null) {
return false
// In case of deserializeStream has not been initialized
deserializeStream = nextBatchStream()
if (deserializeStream == null) {
return false
}
}
nextItem = readNextItem()
}
nextItem != null
}

override def next(): (K, C) = {
val item = if (nextItem == null) readNextItem() else nextItem
if (item == null) {
if (!hasNext) {
throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item
}

private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
if (ds != null) {
ds.close()
if (deserializeStream != null) {
deserializeStream.close()
deserializeStream = null
}
if (fileStream != null) {
Expand Down

0 comments on commit 0126952

Please sign in to comment.