Skip to content

Commit

Permalink
[SPARK-13746][TESTS] stop using deprecated SynchronizedSet
Browse files Browse the repository at this point in the history
trait SynchronizedSet in package mutable is deprecated

Author: Wilson Wu <wilson888888888@gmail.com>

Closes #11580 from wilson888888888/spark-synchronizedset.
  • Loading branch information
Wilson Wu authored and srowen committed Mar 14, 2016
1 parent acdf219 commit 31d069d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
41 changes: 25 additions & 16 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.lang.ref.WeakReference

import scala.collection.mutable.{HashSet, SynchronizedSet}
import scala.collection.mutable.HashSet
import scala.language.existentials
import scala.util.Random

Expand Down Expand Up @@ -442,25 +442,25 @@ class CleanerTester(
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {

val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal

val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
toBeCleanedRDDIds -= rddId
toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
logInfo("RDD " + rddId + " cleaned")
}

def shuffleCleaned(shuffleId: Int): Unit = {
toBeCleanedShuffleIds -= shuffleId
toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
logInfo("Shuffle " + shuffleId + " cleaned")
}

def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
logInfo("Broadcast " + broadcastId + " cleaned")
}

Expand All @@ -469,7 +469,7 @@ class CleanerTester(
}

def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId
toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
logInfo("checkpoint " + rddId + " cleaned")
}
}
Expand Down Expand Up @@ -578,18 +578,27 @@ class CleanerTester(
}

private def uncleanedResourcesToString = {
val s1 = toBeCleanedRDDIds.synchronized {
toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")
}
val s2 = toBeCleanedShuffleIds.synchronized {
toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")
}
val s3 = toBeCleanedBroadcstIds.synchronized {
toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")
}
s"""
|\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tRDDs = $s1
|\tShuffles = $s2
|\tBroadcasts = $s3
""".stripMargin
}

private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty &&
toBeCheckpointIds.isEmpty
toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }

private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
collected.synchronized {
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
assert(collected === testData.toSet, "\nData received does not match data sent")
assert(collected.synchronized { collected === testData.toSet },
"\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
Expand All @@ -205,18 +208,21 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun

stream shouldBe a [ReceiverInputDStream[_]]

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
val collected = new mutable.HashSet[Int]
stream.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
collected.synchronized {
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
assert(collected === modData.toSet, "\nData received does not match data sent")
assert(collected.synchronized { collected === modData.toSet },
"\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
Expand Down

0 comments on commit 31d069d

Please sign in to comment.