Skip to content

Commit

Permalink
Changing the JavaSerializer reset to occur every 1000 objects.
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Nov 17, 2013
1 parent f403826 commit 5eb2b7e
Showing 1 changed file with 11 additions and 1 deletion.
Expand Up @@ -24,8 +24,18 @@ import org.apache.spark.util.ByteBufferInputStream

private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
var counter = 0;
//Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); objOut.reset(); this }
def writeObject[T](t: T): SerializationStream = {
objOut.writeObject(t);
if (counter >= 1000) {
objOut.reset();
counter = 0;
} else {
counter+=1;
}
this
}
def flush() { objOut.flush() }
def close() { objOut.close() }
}
Expand Down

0 comments on commit 5eb2b7e

Please sign in to comment.