Skip to content

Commit

Permalink
Free buffer once iterator has been fully consumed.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Apr 22, 2015
1 parent 62ab054 commit c55bf66
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ case class UnsafeGeneratedAggregate(
val buffers = new BytesToBytesMap(MemoryAllocator.HEAP, 128)

// Set up the mutable "pointers" that we'll re-use when pointing to key and value rows
val keyPointer: UnsafeRow = new UnsafeRow()
val currentBuffer: UnsafeRow = new UnsafeRow()

// We're going to need to allocate a lot of empty aggregation buffers, so let's do it
Expand Down Expand Up @@ -365,11 +364,21 @@ case class UnsafeGeneratedAggregate(
valueAddress.getBaseOffset,
aggregationBufferSchema.length,
aggregationBufferSchema)
// TODO: once the iterator has been fully consumed, we need to free the map so that
// its off-heap memory is reclaimed. This may mean that we'll have to perform an extra
// defensive copy of the last row so that we can free that memory before returning
// to the caller.
resultProjection(joinedRow(key, value))
val result = resultProjection(joinedRow(key, value))
if (hasNext) {
result
} else {
// This is the last element in the iterator, so let's free the buffer. Before we do,
// though, we need to make a defensive copy of the result so that we don't return an
// object that might contain dangling pointers to the freed memory
val resultCopy = result.copy()
buffers.free()
resultCopy
}
}

override def finalize(): Unit = {
buffers.free()
}
}
}
Expand Down

0 comments on commit c55bf66

Please sign in to comment.