Skip to content

Commit

Permalink
[SPARK-13164][CORE] Replace deprecated synchronized buffer in core
Browse files Browse the repository at this point in the history
Building with scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative. Investigation shows we are already using ConcurrentLinkedQueue in other locations so switch our uses of SynchronizedBuffer to ConcurrentLinkedQueue.

Author: Holden Karau <holden@us.ibm.com>

Closes #11059 from holdenk/SPARK-13164-replace-deprecated-synchronized-buffer-in-core.
  • Loading branch information
holdenk authored and Andrew Or committed Feb 4, 2016
1 parent 2eaeafe commit 62a7c28
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 39 deletions.
26 changes: 12 additions & 14 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.collection.JavaConverters._

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
Expand Down Expand Up @@ -57,13 +57,11 @@ private class CleanupTaskWeakReference(
*/
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference]
with SynchronizedBuffer[CleanupTaskWeakReference]
private val referenceBuffer = new ConcurrentLinkedQueue[CleanupTaskWeakReference]()

private val referenceQueue = new ReferenceQueue[AnyRef]

private val listeners = new ArrayBuffer[CleanerListener]
with SynchronizedBuffer[CleanerListener]
private val listeners = new ConcurrentLinkedQueue[CleanerListener]()

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

Expand Down Expand Up @@ -111,7 +109,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/** Attach a listener object to get information of when objects are cleaned. */
def attachListener(listener: CleanerListener): Unit = {
listeners += listener
listeners.add(listener)
}

/** Start the cleaner. */
Expand Down Expand Up @@ -166,7 +164,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
Expand All @@ -179,7 +177,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
referenceBuffer.remove(reference.get)
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
Expand All @@ -206,7 +204,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning RDD " + rddId)
sc.unpersistRDD(rddId, blocking)
listeners.foreach(_.rddCleaned(rddId))
listeners.asScala.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case e: Exception => logError("Error cleaning RDD " + rddId, e)
Expand All @@ -219,7 +217,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
blockManagerMaster.removeShuffle(shuffleId, blocking)
listeners.foreach(_.shuffleCleaned(shuffleId))
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
Expand All @@ -231,7 +229,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug(s"Cleaning broadcast $broadcastId")
broadcastManager.unbroadcast(broadcastId, true, blocking)
listeners.foreach(_.broadcastCleaned(broadcastId))
listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
Expand All @@ -243,7 +241,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning accumulator " + accId)
Accumulators.remove(accId)
listeners.foreach(_.accumCleaned(accId))
listeners.asScala.foreach(_.accumCleaned(accId))
logInfo("Cleaned accumulator " + accId)
} catch {
case e: Exception => logError("Error cleaning accumulator " + accId, e)
Expand All @@ -258,7 +256,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
listeners.foreach(_.checkpointCleaned(rddId))
listeners.asScala.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.deploy.client

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -165,14 +167,14 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd

/** Application Listener to collect events */
private class AppClientCollector extends AppClientListener with Logging {
val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val connectedIdList = new ConcurrentLinkedQueue[String]()
@volatile var disconnectedCount: Int = 0
val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val deadReasonList = new ConcurrentLinkedQueue[String]()
val execAddedList = new ConcurrentLinkedQueue[String]()
val execRemovedList = new ConcurrentLinkedQueue[String]()

def connected(id: String): Unit = {
connectedIdList += id
connectedIdList.add(id)
}

def disconnected(): Unit = {
Expand All @@ -182,7 +184,7 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
}

def dead(reason: String): Unit = {
deadReasonList += reason
deadReasonList.add(reason)
}

def executorAdded(
Expand All @@ -191,11 +193,11 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
hostPort: String,
cores: Int,
memory: Int): Unit = {
execAddedList += id
execAddedList.add(id)
}

def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
execRemovedList += id
execRemovedList.add(id)
}
}

Expand Down
23 changes: 12 additions & 11 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.rpc
import java.io.{File, NotSerializableException}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeoutException, TimeUnit}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -490,30 +491,30 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

/**
* Setup an [[RpcEndpoint]] to collect all network events.
* @return the [[RpcEndpointRef]] and an `Seq` that contains network events.
* @return the [[RpcEndpointRef]] and an `ConcurrentLinkedQueue` that contains network events.
*/
private def setupNetworkEndpoint(
_env: RpcEnv,
name: String): (RpcEndpointRef, Seq[(Any, Any)]) = {
val events = new mutable.ArrayBuffer[(Any, Any)] with mutable.SynchronizedBuffer[(Any, Any)]
name: String): (RpcEndpointRef, ConcurrentLinkedQueue[(Any, Any)]) = {
val events = new ConcurrentLinkedQueue[(Any, Any)]
val ref = _env.setupEndpoint("network-events-non-client", new ThreadSafeRpcEndpoint {
override val rpcEnv = _env

override def receive: PartialFunction[Any, Unit] = {
case "hello" =>
case m => events += "receive" -> m
case m => events.add("receive" -> m)
}

override def onConnected(remoteAddress: RpcAddress): Unit = {
events += "onConnected" -> remoteAddress
events.add("onConnected" -> remoteAddress)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
events += "onDisconnected" -> remoteAddress
events.add("onDisconnected" -> remoteAddress)
}

override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
events += "onNetworkError" -> remoteAddress
events.add("onNetworkError" -> remoteAddress)
}

})
Expand Down Expand Up @@ -560,16 +561,16 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

eventually(timeout(5 seconds), interval(5 millis)) {
// We don't know the exact client address but at least we can verify the message type
assert(events.map(_._1).contains("onConnected"))
assert(events.asScala.map(_._1).exists(_ == "onConnected"))
}

clientEnv.shutdown()
clientEnv.awaitTermination()

eventually(timeout(5 seconds), interval(5 millis)) {
// We don't know the exact client address but at least we can verify the message type
assert(events.map(_._1).contains("onConnected"))
assert(events.map(_._1).contains("onDisconnected"))
assert(events.asScala.map(_._1).exists(_ == "onConnected"))
assert(events.asScala.map(_._1).exists(_ == "onDisconnected"))
}
} finally {
clientEnv.shutdown()
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.util

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps

Expand All @@ -31,19 +31,19 @@ import org.apache.spark.SparkFunSuite
class EventLoopSuite extends SparkFunSuite with Timeouts {

test("EventLoop") {
val buffer = new mutable.ArrayBuffer[Int] with mutable.SynchronizedBuffer[Int]
val buffer = new ConcurrentLinkedQueue[Int]
val eventLoop = new EventLoop[Int]("test") {

override def onReceive(event: Int): Unit = {
buffer += event
buffer.add(event)
}

override def onError(e: Throwable): Unit = {}
}
eventLoop.start()
(1 to 100).foreach(eventLoop.post)
eventually(timeout(5 seconds), interval(5 millis)) {
assert((1 to 100) === buffer.toSeq)
assert((1 to 100) === buffer.asScala.toSeq)
}
eventLoop.stop()
}
Expand Down

0 comments on commit 62a7c28

Please sign in to comment.