Skip to content

Commit

Permalink
SPARK-12729 PhantomReferences to replace Finalize()- Fixed styling is…
Browse files Browse the repository at this point in the history
…sues
  • Loading branch information
GayathriMurali committed Feb 11, 2016
1 parent 1b3f732 commit 837252a
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.language.existentials
import scala.language.postfixOps
import scala.ref
import scala.util.control.NonFatal

import com.google.common.base.Charsets.UTF_8
Expand Down Expand Up @@ -893,11 +892,11 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
val queue = new ReferenceQueue[File]()
val phantomReferences = new ListBuffer[FilePhantomReference]()
val threadName = "WeakReference"
/**

/**
* Read data from disks, then copy it to `out`
*/

private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val in = new FileInputStream(new File(path))
try {
Utils.copyStream(in, out)
Expand All @@ -909,7 +908,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
/**
* Write data into disk, using randomly generated name.
*/
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val dir = new File(Utils.getLocalDir(SparkEnv.get.conf))
val file = File.createTempFile("broadcast", "", dir)
phantomReferences += new FilePhantomReference(file, queue)
Expand All @@ -921,6 +920,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
in.close();
}
}

/** Create a seperate daemon thread
* to remove phantomreferences from queue and invoke cleanup
*/
Expand Down

0 comments on commit 837252a

Please sign in to comment.