Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.CompactBuffer

class HashedRelationSuite extends SharedSparkSession {
val umm = new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1)

val mm = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
val mm = new TaskMemoryManager(umm, 0)

val rand = new Random(100)

Expand All @@ -64,6 +63,11 @@ class HashedRelationSuite extends SharedSparkSession {
val sparseRows = sparseArray.map(i => projection(InternalRow(i.toLong)).copy())
val randomRows = randomArray.map(i => projection(InternalRow(i.toLong)).copy())

protected override def afterEach(): Unit = {
super.afterEach()
assert(umm.executionMemoryUsed === 0)
}

test("UnsafeHashedRelation") {
val schema = StructType(StructField("a", IntegerType, true) :: Nil)
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
Expand All @@ -87,6 +91,7 @@ class HashedRelationSuite extends SharedSparkSession {
val out = new ObjectOutputStream(os)
hashed.asInstanceOf[UnsafeHashedRelation].writeExternal(out)
out.flush()
hashed.close()
val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray))
val hashed2 = new UnsafeHashedRelation()
hashed2.readExternal(in)
Expand All @@ -108,19 +113,13 @@ class HashedRelationSuite extends SharedSparkSession {
}

test("test serialization empty hash map") {
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
val binaryMap = new BytesToBytesMap(taskMemoryManager, 1, 1)
val binaryMap = new BytesToBytesMap(mm, 1, 1)
val os = new ByteArrayOutputStream()
val out = new ObjectOutputStream(os)
val hashed = new UnsafeHashedRelation(1, 1, binaryMap)
hashed.writeExternal(out)
out.flush()
hashed.close()
val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray))
val hashed2 = new UnsafeHashedRelation()
hashed2.readExternal(in)
Expand Down Expand Up @@ -149,9 +148,10 @@ class HashedRelationSuite extends SharedSparkSession {
assert(row.getLong(0) === i)
assert(row.getInt(1) === i + 1)
}
longRelation.close()

val longRelation2 = LongHashedRelation(rows.iterator ++ rows.iterator, key, 100, mm)
.asInstanceOf[LongHashedRelation]
.asInstanceOf[LongHashedRelation]
assert(!longRelation2.keyIsUnique)
(0 until 100).foreach { i =>
val rows = longRelation2.get(i).toArray
Expand All @@ -166,6 +166,7 @@ class HashedRelationSuite extends SharedSparkSession {
val out = new ObjectOutputStream(os)
longRelation2.writeExternal(out)
out.flush()
longRelation2.close()
val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray))
val relation = new LongHashedRelation()
relation.readExternal(in)
Expand All @@ -181,19 +182,12 @@ class HashedRelationSuite extends SharedSparkSession {
}

test("LongToUnsafeRowMap with very wide range") {
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false)))

{
// SPARK-16740
val keys = Seq(0L, Long.MaxValue, Long.MaxValue)
val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
val map = new LongToUnsafeRowMap(mm, 1)
keys.foreach { k =>
map.append(k, unsafeProj(InternalRow(k)))
}
Expand All @@ -210,7 +204,7 @@ class HashedRelationSuite extends SharedSparkSession {
{
// SPARK-16802
val keys = Seq(Long.MaxValue, Long.MaxValue - 10)
val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
val map = new LongToUnsafeRowMap(mm, 1)
keys.foreach { k =>
map.append(k, unsafeProj(InternalRow(k)))
}
Expand All @@ -226,20 +220,13 @@ class HashedRelationSuite extends SharedSparkSession {
}

test("LongToUnsafeRowMap with random keys") {
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false)))

val N = 1000000
val rand = new Random
val keys = (0 to N).map(x => rand.nextLong()).toArray

val map = new LongToUnsafeRowMap(taskMemoryManager, 10)
val map = new LongToUnsafeRowMap(mm, 10)
keys.foreach { k =>
map.append(k, unsafeProj(InternalRow(k)))
}
Expand All @@ -249,8 +236,9 @@ class HashedRelationSuite extends SharedSparkSession {
val out = new ObjectOutputStream(os)
map.writeExternal(out)
out.flush()
map.free()
val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray))
val map2 = new LongToUnsafeRowMap(taskMemoryManager, 1)
val map2 = new LongToUnsafeRowMap(mm, 1)
map2.readExternal(in)

val row = unsafeProj(InternalRow(0L)).copy()
Expand All @@ -276,19 +264,12 @@ class HashedRelationSuite extends SharedSparkSession {
}
i += 1
}
map.free()
map2.free()
}

test("SPARK-24257: insert big values into LongToUnsafeRowMap") {
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
val unsafeProj = UnsafeProjection.create(Array[DataType](StringType))
val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
val map = new LongToUnsafeRowMap(mm, 1)

val key = 0L
// the page array is initialized with length 1 << 17 (1M bytes),
Expand Down Expand Up @@ -343,6 +324,7 @@ class HashedRelationSuite extends SharedSparkSession {
val rows = (0 until 100).map(i => unsafeProj(InternalRow(Int.int2long(i), i + 1)).copy())
val longRelation = LongHashedRelation(rows.iterator ++ rows.iterator, key, 100, mm)
val longRelation2 = ser.deserialize[LongHashedRelation](ser.serialize(longRelation))
longRelation.close()
(0 until 100).foreach { i =>
val rows = longRelation2.get(i).toArray
assert(rows.length === 2)
Expand All @@ -359,6 +341,7 @@ class HashedRelationSuite extends SharedSparkSession {
unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(out)
out.flush()
val unsafeHashed2 = ser.deserialize[UnsafeHashedRelation](ser.serialize(unsafeHashed))
unsafeHashed.close()
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
unsafeHashed2.writeExternal(out2)
Expand Down Expand Up @@ -398,6 +381,7 @@ class HashedRelationSuite extends SharedSparkSession {
thread2.join()

val unsafeHashed2 = ser.deserialize[UnsafeHashedRelation](ser.serialize(unsafeHashed))
unsafeHashed.close()
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
unsafeHashed2.writeExternal(out2)
Expand Down Expand Up @@ -452,18 +436,21 @@ class HashedRelationSuite extends SharedSparkSession {
val hashedRelation = UnsafeHashedRelation(contiguousRows.iterator, singleKey, 1, mm)
val keyIterator = hashedRelation.keys()
assert(keyIterator.map(key => key.getLong(0)).toArray === contiguousArray)
hashedRelation.close()
}

test("UnsafeHashedRelation: key set iterator on a sparse array of keys") {
val hashedRelation = UnsafeHashedRelation(sparseRows.iterator, singleKey, 1, mm)
val keyIterator = hashedRelation.keys()
assert(keyIterator.map(key => key.getLong(0)).toArray === sparseArray)
hashedRelation.close()
}

test("LongHashedRelation: key set iterator on a contiguous array of keys") {
val longRelation = LongHashedRelation(contiguousRows.iterator, singleKey, 1, mm)
val keyIterator = longRelation.keys()
assert(keyIterator.map(key => key.getLong(0)).toArray === contiguousArray)
longRelation.close()
}

test("LongToUnsafeRowMap: key set iterator on a contiguous array of keys") {
Expand All @@ -478,6 +465,7 @@ class HashedRelationSuite extends SharedSparkSession {
rowMap.optimize()
keyIterator = rowMap.keys()
assert(keyIterator.map(key => key.getLong(0)).toArray === contiguousArray)
rowMap.free()
}

test("LongToUnsafeRowMap: key set iterator on a sparse array with equidistant keys") {
Expand All @@ -490,6 +478,7 @@ class HashedRelationSuite extends SharedSparkSession {
rowMap.optimize()
keyIterator = rowMap.keys()
assert(keyIterator.map(_.getLong(0)).toArray === sparseArray)
rowMap.free()
}

test("LongToUnsafeRowMap: key set iterator on an array with a single key") {
Expand Down Expand Up @@ -530,6 +519,7 @@ class HashedRelationSuite extends SharedSparkSession {
buffer.append(keyIterator.next().getLong(0))
}
assert(buffer === randomArray)
rowMap.free()
}

test("LongToUnsafeRowMap: no explicit hasNext calls on the key iterator") {
Expand Down Expand Up @@ -560,6 +550,7 @@ class HashedRelationSuite extends SharedSparkSession {
buffer.append(keyIterator.next().getLong(0))
}
assert(buffer === randomArray)
rowMap.free()
}

test("LongToUnsafeRowMap: call hasNext at the end of the iterator") {
Expand All @@ -577,6 +568,7 @@ class HashedRelationSuite extends SharedSparkSession {
assert(keyIterator.map(key => key.getLong(0)).toArray === sparseArray)
assert(keyIterator.hasNext == false)
assert(keyIterator.hasNext == false)
rowMap.free()
}

test("LongToUnsafeRowMap: random sequence of hasNext and next() calls on the key iterator") {
Expand Down Expand Up @@ -607,6 +599,7 @@ class HashedRelationSuite extends SharedSparkSession {
}
}
assert(buffer === randomArray)
rowMap.free()
}

test("HashJoin: packing and unpacking with the same key type in a LongType") {
Expand Down Expand Up @@ -661,6 +654,7 @@ class HashedRelationSuite extends SharedSparkSession {
assert(hashed.keys().isEmpty)
assert(hashed.keyIsUnique)
assert(hashed.estimatedSize == 0)
hashed.close()
}

test("SPARK-32399: test methods related to key index") {
Expand Down Expand Up @@ -739,20 +733,14 @@ class HashedRelationSuite extends SharedSparkSession {
val actualValues = row.map(_._2.getInt(1))
assert(actualValues === expectedValues)
}
unsafeRelation.close()
}

test("LongToUnsafeRowMap support ignoresDuplicatedKey") {
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false)))
val keys = Seq(1L, 1L, 1L)
Seq(true, false).foreach { ignoresDuplicatedKey =>
val map = new LongToUnsafeRowMap(taskMemoryManager, 1, ignoresDuplicatedKey)
val map = new LongToUnsafeRowMap(mm, 1, ignoresDuplicatedKey)
keys.foreach { k =>
map.append(k, unsafeProj(InternalRow(k)))
}
Expand Down