Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1176,13 +1176,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = null)
}

def cannotAcquireMemoryToBuildLongHashedRelationError(size: Long, got: Long): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2106",
messageParameters = Map("size" -> size.toString(), "got" -> got.toString()),
cause = null)
}

def cannotAcquireMemoryToBuildUnsafeHashedRelationError(): Throwable = {
new SparkOutOfMemoryError(
"_LEGACY_ERROR_TEMP_2107",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io._
import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkUnsupportedOperationException}
import org.apache.spark.{SparkConf, SparkEnv, SparkUnsupportedOperationException}
import org.apache.spark.internal.config.{BUFFER_PAGESIZE, MEMORY_OFFHEAP_ENABLED}
import org.apache.spark.memory._
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -32,6 +32,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.LongType
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.{KnownSizeEstimation, Utils}

/**
Expand Down Expand Up @@ -535,7 +536,7 @@ private[execution] final class LongToUnsafeRowMap(
val mm: TaskMemoryManager,
capacity: Int,
ignoresDuplicatedKey: Boolean = false)
extends MemoryConsumer(mm, MemoryMode.ON_HEAP) with Externalizable with KryoSerializable {
extends MemoryConsumer(mm, mm.getTungstenMemoryMode) with Externalizable with KryoSerializable {

// Whether the keys are stored in dense mode or not.
private var isDense = false
Expand All @@ -550,15 +551,15 @@ private[execution] final class LongToUnsafeRowMap(
//
// Sparse mode: [key1] [offset1 | size1] [key2] [offset | size2] ...
// Dense mode: [offset1 | size1] [offset2 | size2]
private var array: Array[Long] = null
private var array: UnsafeLongArray = null
private var mask: Int = 0

// The page to store all bytes of UnsafeRow and the pointer to next rows.
// [row1][pointer1] [row2][pointer2]
private var page: Array[Long] = null
private var page: MemoryBlock = null

// Current write cursor in the page.
private var cursor: Long = Platform.LONG_ARRAY_OFFSET
private var cursor: Long = -1

// The number of bits for size in address
private val SIZE_BITS = 28
Expand All @@ -583,24 +584,15 @@ private[execution] final class LongToUnsafeRowMap(
0)
}

private def ensureAcquireMemory(size: Long): Unit = {
// do not support spilling
val got = acquireMemory(size)
if (got < size) {
freeMemory(got)
throw QueryExecutionErrors.cannotAcquireMemoryToBuildLongHashedRelationError(size, got)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this error class

}
}

private def init(): Unit = {
if (mm != null) {
require(capacity < 512000000, "Cannot broadcast 512 million or more rows")
var n = 1
while (n < capacity) n *= 2
ensureAcquireMemory(n * 2L * 8 + (1 << 20))
array = new Array[Long](n * 2)
array = new UnsafeLongArray(n * 2)
mask = n * 2 - 2
page = new Array[Long](1 << 17) // 1M bytes
page = allocatePage(1 << 20)// 1M bytes
cursor = page.getBaseOffset
}
}

Expand All @@ -616,7 +608,7 @@ private[execution] final class LongToUnsafeRowMap(
/**
* Returns total memory consumption.
*/
def getTotalMemoryConsumption: Long = array.length * 8L + page.length * 8L
def getTotalMemoryConsumption: Long = array.length * 8L + page.size()

/**
* Returns the first slot of array that store the keys (sparse mode).
Expand All @@ -632,19 +624,19 @@ private[execution] final class LongToUnsafeRowMap(
private def nextSlot(pos: Int): Int = (pos + 2) & mask

private[this] def toAddress(offset: Long, size: Int): Long = {
((offset - Platform.LONG_ARRAY_OFFSET) << SIZE_BITS) | size
(offset << SIZE_BITS) | size
}

private[this] def toOffset(address: Long): Long = {
(address >>> SIZE_BITS) + Platform.LONG_ARRAY_OFFSET
(address >>> SIZE_BITS)
}

private[this] def toSize(address: Long): Int = {
(address & SIZE_MASK).toInt
}

private def getRow(address: Long, resultRow: UnsafeRow): UnsafeRow = {
resultRow.pointTo(page, toOffset(address), toSize(address))
resultRow.pointTo(page.getBaseObject, page.getBaseOffset + toOffset(address), toSize(address))
resultRow
}

Expand Down Expand Up @@ -681,8 +673,8 @@ private[execution] final class LongToUnsafeRowMap(
override def next(): UnsafeRow = {
val offset = toOffset(addr)
val size = toSize(addr)
resultRow.pointTo(page, offset, size)
addr = Platform.getLong(page, offset + size)
resultRow.pointTo(page.getBaseObject, page.getBaseOffset + offset, size)
addr = Platform.getLong(page.getBaseObject, page.getBaseOffset + offset + size)
resultRow
}
}
Expand Down Expand Up @@ -777,12 +769,13 @@ private[execution] final class LongToUnsafeRowMap(

// copy the bytes of UnsafeRow
val offset = cursor
Platform.copyMemory(row.getBaseObject, row.getBaseOffset, page, cursor, row.getSizeInBytes)
Platform.copyMemory(row.getBaseObject, row.getBaseOffset, page.getBaseObject, cursor,
row.getSizeInBytes)
cursor += row.getSizeInBytes
Platform.putLong(page, cursor, 0)
Platform.putLong(page.getBaseObject, cursor, 0)
cursor += 8
numValues += 1
updateIndex(key, pos, toAddress(offset, row.getSizeInBytes))
updateIndex(key, pos, toAddress(offset - page.getBaseOffset, row.getSizeInBytes))
}

private def findKeyPosition(key: Long): Int = {
Expand Down Expand Up @@ -816,35 +809,34 @@ private[execution] final class LongToUnsafeRowMap(
} else {
// there are some values for this key, put the address in the front of them.
val pointer = toOffset(address) + toSize(address)
Platform.putLong(page, pointer, array(pos + 1))
Platform.putLong(page.getBaseObject, page.getBaseOffset + pointer, array(pos + 1))
array(pos + 1) = address
}
}

private def grow(inputRowSize: Int): Unit = {
// There is 8 bytes for the pointer to next value
val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8
if (neededNumWords > page.length) {
val usedBytes = cursor - page.getBaseOffset
val neededNumWords = (usedBytes + 8 + inputRowSize + 7) / 8
if (neededNumWords > page.size() / 8) {
if (neededNumWords > (1 << 30)) {
throw QueryExecutionErrors.cannotBuildHashedRelationLargerThan8GError()
}
val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30))
ensureAcquireMemory(newNumWords * 8L)
val newPage = new Array[Long](newNumWords.toInt)
Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
cursor - Platform.LONG_ARRAY_OFFSET)
val used = page.length
val newNumWords = math.max(neededNumWords, math.min(page.size() / 8 * 2, 1 << 30))
val newPage = allocatePage(newNumWords.toInt * 8)
Platform.copyMemory(page.getBaseObject, page.getBaseOffset, newPage.getBaseObject,
newPage.getBaseOffset, usedBytes)
freePage(page)
page = newPage
freeMemory(used * 8L)
cursor = page.getBaseOffset + usedBytes
}
}

private def growArray(): Unit = {
var old_array = array
val n = array.length
numKeys = 0
ensureAcquireMemory(n * 2 * 8L)
array = new Array[Long](n * 2)
array = new UnsafeLongArray(n * 2)
mask = n * 2 - 2
var i = 0
while (i < old_array.length) {
Expand All @@ -854,8 +846,8 @@ private[execution] final class LongToUnsafeRowMap(
}
i += 2
}
old_array.free()
old_array = null // release the reference to old array
freeMemory(n * 8L)
}

/**
Expand All @@ -866,14 +858,7 @@ private[execution] final class LongToUnsafeRowMap(
// Convert to dense mode if it does not require more memory or could fit within L1 cache
// SPARK-16740: Make sure range doesn't overflow if minKey has a large negative value
if (range >= 0 && (range < array.length || range < 1024)) {
try {
ensureAcquireMemory((range + 1) * 8L)
} catch {
case e: SparkException =>
// there is no enough memory to convert
return
}
val denseArray = new Array[Long]((range + 1).toInt)
val denseArray = new UnsafeLongArray((range + 1).toInt)
var i = 0
while (i < array.length) {
if (array(i + 1) > 0) {
Expand All @@ -882,10 +867,9 @@ private[execution] final class LongToUnsafeRowMap(
}
i += 2
}
val old_length = array.length
array.free()
array = denseArray
isDense = true
freeMemory(old_length * 8L)
}
}

Expand All @@ -894,25 +878,26 @@ private[execution] final class LongToUnsafeRowMap(
*/
def free(): Unit = {
if (page != null) {
freeMemory(page.length * 8L)
freePage(page)
page = null
}
if (array != null) {
freeMemory(array.length * 8L)
array.free()
array = null
}
}

private def writeLongArray(
private def writeBytes(
writeBuffer: (Array[Byte], Int, Int) => Unit,
arr: Array[Long],
baseObject: Object,
baseOffset: Long,
len: Int): Unit = {
val buffer = new Array[Byte](4 << 10)
var offset: Long = Platform.LONG_ARRAY_OFFSET
val end = len * 8L + Platform.LONG_ARRAY_OFFSET
var offset: Long = baseOffset
val end = len * 8L + offset
while (offset < end) {
val size = Math.min(buffer.length, end - offset)
Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, size)
Platform.copyMemory(baseObject, offset, buffer, Platform.BYTE_ARRAY_OFFSET, size)
writeBuffer(buffer, 0, size.toInt)
offset += size
}
Expand All @@ -929,10 +914,11 @@ private[execution] final class LongToUnsafeRowMap(
writeLong(numValues)

writeLong(array.length)
writeLongArray(writeBuffer, array, array.length)
val used = ((cursor - Platform.LONG_ARRAY_OFFSET) / 8).toInt
writeBytes(writeBuffer,
array.memoryBlock.getBaseObject, array.memoryBlock.getBaseOffset, array.length)
val used = ((cursor - page.getBaseOffset) / 8).toInt
writeLong(used)
writeLongArray(writeBuffer, page, used)
writeBytes(writeBuffer, page.getBaseObject, page.getBaseOffset, used)
}

override def writeExternal(output: ObjectOutput): Unit = {
Expand All @@ -943,20 +929,20 @@ private[execution] final class LongToUnsafeRowMap(
write(out.writeBoolean, out.writeLong, out.write)
}

private def readLongArray(
private def readData(
readBuffer: (Array[Byte], Int, Int) => Unit,
length: Int): Array[Long] = {
val array = new Array[Long](length)
baseObject: Object,
baseOffset: Long,
length: Int): Unit = {
val buffer = new Array[Byte](4 << 10)
var offset: Long = Platform.LONG_ARRAY_OFFSET
val end = length * 8L + Platform.LONG_ARRAY_OFFSET
var offset: Long = baseOffset
val end = length * 8L + baseOffset
while (offset < end) {
val size = Math.min(buffer.length, end - offset)
readBuffer(buffer, 0, size.toInt)
Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, size)
Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, baseObject, offset, size)
offset += size
}
array
}

private def read(
Expand All @@ -971,11 +957,15 @@ private[execution] final class LongToUnsafeRowMap(

val length = readLong().toInt
mask = length - 2
array = readLongArray(readBuffer, length)
array.free()
array = new UnsafeLongArray(length)
readData(readBuffer, array.memoryBlock.getBaseObject, array.memoryBlock.getBaseOffset, length)
val pageLength = readLong().toInt
page = readLongArray(readBuffer, pageLength)
freePage(page)
page = allocatePage(pageLength * 8)
readData(readBuffer, page.getBaseObject, page.getBaseOffset, pageLength)
// Restore cursor variable to make this map able to be serialized again on executors.
cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET
cursor = pageLength * 8 + page.getBaseOffset
}

override def readExternal(in: ObjectInput): Unit = {
Expand All @@ -985,6 +975,26 @@ private[execution] final class LongToUnsafeRowMap(
override def read(kryo: Kryo, in: Input): Unit = {
read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
}

private class UnsafeLongArray(val length: Int) {
val memoryBlock: MemoryBlock = allocatePage(length * 8)

for (i <- 0 until length) {
update(i, 0)
}

def apply(index: Int): Long = {
Platform.getLong(memoryBlock.getBaseObject, memoryBlock.getBaseOffset + index * 8)
}

def update(index: Int, value: Long): Unit = {
Platform.putLong(memoryBlock.getBaseObject, memoryBlock.getBaseOffset + index * 8, value)
}

def free(): Unit = {
freePage(memoryBlock)
}
}
}

class LongHashedRelation(
Expand Down
Loading