Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13746][Tests]stop using deprecated SynchronizedSet #11580

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.lang.ref.WeakReference

import scala.collection.mutable.{HashSet, SynchronizedSet}
import scala.collection.mutable.HashSet
import scala.language.existentials
import scala.util.Random

Expand Down Expand Up @@ -442,25 +442,25 @@ class CleanerTester(
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {

val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be = HashSet(rddIds) and similarly for the next 4?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
I tried to change to val toBeCleanedRDDIds = HashSet(rddIds), but I got error at
toBeCleanedRDDIds -= rddId
so I will keep val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right Scala doesn't have a constructor that builds a set from a list/seq, so this makes a set of a Seq[Int]. Yes this is fine.

val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal

val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
toBeCleanedRDDIds -= rddId
toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
logInfo("RDD " + rddId + " cleaned")
}

def shuffleCleaned(shuffleId: Int): Unit = {
toBeCleanedShuffleIds -= shuffleId
toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
logInfo("Shuffle " + shuffleId + " cleaned")
}

def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
logInfo("Broadcast " + broadcastId + " cleaned")
}

Expand All @@ -469,7 +469,7 @@ class CleanerTester(
}

def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId
toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
logInfo("checkpoint " + rddId + " cleaned")
}
}
Expand Down Expand Up @@ -578,18 +578,27 @@ class CleanerTester(
}

private def uncleanedResourcesToString = {
val s1 = {toBeCleanedRDDIds.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two statements have extra braces

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
Sorry, I overlooked this. Will change now. Thanks!

toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
}
val s2 = {toBeCleanedShuffleIds.synchronized {
toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
}
val s3 = toBeCleanedBroadcstIds.synchronized {
toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")
}
s"""
|\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tRDDs = $s1
|\tShuffles = $s2
|\tBroadcasts = $s3
""".stripMargin
}

private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty &&
toBeCheckpointIds.isEmpty
toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }

private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
collected.synchronized {
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
assert(collected === testData.toSet, "\nData received does not match data sent")
assert(collected.synchronized { collected === testData.toSet },
"\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
Expand All @@ -205,18 +208,21 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun

stream shouldBe a [ReceiverInputDStream[_]]

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
val collected = new mutable.HashSet[Int]
stream.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
collected.synchronized {
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
assert(collected === modData.toSet, "\nData received does not match data sent")
assert(collected.synchronized { collected === modData.toSet },
"\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
Expand Down