Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13746][Tests]stop using deprecated SynchronizedSet #11580

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.lang.ref.WeakReference

import scala.collection.mutable.{HashSet, SynchronizedSet}
import scala.collection.mutable.HashSet
import scala.language.existentials
import scala.util.Random

Expand Down Expand Up @@ -442,25 +442,25 @@ class CleanerTester(
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {

val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be = HashSet(rddIds) and similarly for the next 4?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
I tried to change to val toBeCleanedRDDIds = HashSet(rddIds), but I got error at
toBeCleanedRDDIds -= rddId
so I will keep val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right Scala doesn't have a constructor that builds a set from a list/seq, so this makes a set of a Seq[Int]. Yes this is fine.

val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal

val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
toBeCleanedRDDIds -= rddId
toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
logInfo("RDD " + rddId + " cleaned")
}

def shuffleCleaned(shuffleId: Int): Unit = {
toBeCleanedShuffleIds -= shuffleId
toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
logInfo("Shuffle " + shuffleId + " cleaned")
}

def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
logInfo("Broadcast " + broadcastId + " cleaned")
}

Expand All @@ -469,7 +469,7 @@ class CleanerTester(
}

def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId
toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
logInfo("checkpoint " + rddId + " cleaned")
}
}
Expand Down Expand Up @@ -579,17 +579,20 @@ class CleanerTester(

private def uncleanedResourcesToString = {
s"""
|\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}
|\tRDDs = ${toBeCleanedRDDIds.synchronized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point it's getting hard to read this in the interpolated string. I'd just pull out the calls to mkString and save them as vals.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
I will change this. Thanks!

{toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}}
|\tShuffles = ${toBeCleanedShuffleIds.synchronized
{toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}}
|\tBroadcasts = ${toBeCleanedBroadcstIds.synchronized
{toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}}
""".stripMargin
}

private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty &&
toBeCheckpointIds.isEmpty
toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }

private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
collected.synchronized {
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
assert(collected === testData.toSet, "\nData received does not match data sent")
assert(collected.synchronized { collected === testData.toSet },
"\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
Expand All @@ -205,18 +208,21 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun

stream shouldBe a [ReceiverInputDStream[_]]

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
val collected = new mutable.HashSet[Int]
stream.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
collected.synchronized {
collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
assert(collected === modData.toSet, "\nData received does not match data sent")
assert(collected.synchronized { collected === modData.toSet },
"\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
Expand Down