From c55bf668efe9494caca1f7952c37b34035341cea Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 19 Apr 2015 17:53:49 -0700 Subject: [PATCH] Free buffer once iterator has been fully consumed. --- .../execution/UnsafeGeneratedAggregate.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeGeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeGeneratedAggregate.scala index 7e11db0e0f30a..b2ca8d1447011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeGeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeGeneratedAggregate.scala @@ -261,7 +261,6 @@ case class UnsafeGeneratedAggregate( val buffers = new BytesToBytesMap(MemoryAllocator.HEAP, 128) // Set up the mutable "pointers" that we'll re-use when pointing to key and value rows - val keyPointer: UnsafeRow = new UnsafeRow() val currentBuffer: UnsafeRow = new UnsafeRow() // We're going to need to allocate a lot of empty aggregation buffers, so let's do it @@ -365,11 +364,21 @@ case class UnsafeGeneratedAggregate( valueAddress.getBaseOffset, aggregationBufferSchema.length, aggregationBufferSchema) - // TODO: once the iterator has been fully consumed, we need to free the map so that - // its off-heap memory is reclaimed. This may mean that we'll have to perform an extra - // defensive copy of the last row so that we can free that memory before returning - // to the caller. - resultProjection(joinedRow(key, value)) + val result = resultProjection(joinedRow(key, value)) + if (hasNext) { + result + } else { + // This is the last element in the iterator, so let's free the buffer. Before we do, + // though, we need to make a defensive copy of the result so that we don't return an + // object that might contain dangling pointers to the freed memory + val resultCopy = result.copy() + buffers.free() + resultCopy + } + } + + override def finalize(): Unit = { + buffers.free() } } }