Patch for SPARK-942 #50

Closed
wants to merge 29 commits into
from

Projects

None yet

5 participants

@kellrott
kellrott commented Mar 1, 2014

This is a port of a pull request original targeted at incubator-spark: https://github.com/apache/incubator-spark/pull/180

Essentially if a user returns a generative iterator (from a flatMap operation), when trying to persist the data, Spark would first unroll the iterator into an ArrayBuffer, and then try to figure out if it could store the data. In cases where the user provided an iterator that generated more data then available memory, this would case a crash. With this patch, if the user requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is inputed into the serializer.

To do this, two changes where made:

  1. The type of the 'values' argument in the putValues method of the BlockStore interface was changed from ArrayBuffer to Iterator (and all code interfacing with this method was modified to connect correctly.
  2. The JavaSerializer now calls the ObjectOutputStream 'reset' method every 1000 objects. This was done because the ObjectOutputStream caches objects (thus preventing them from being GC'd) to write more compact serialization. If reset is never called, eventually the memory fills up, if it is called too often then the serialization streams become much larger because of redundant class descriptions.
kellrott added some commits Nov 13, 2013
@kellrott kellrott Changing CacheManager and BlockManager to pass iterators directly to …
…the serializer when a 'DISK_ONLY' persist is called.

This is in response to SPARK-942.
efe1102
@kellrott kellrott Fixing MemoryStore, so that it converts incoming iterators to ArrayBu…
…ffer objects. This was previously done higher up the stack.
cac1fad
@kellrott kellrott Merge remote-tracking branch 'origin/master' into iterator-to-disk d32992f
@kellrott kellrott Adding unit test for straight to disk iterator methods. 81d670c
@kellrott kellrott Merge branch 'master' into iterator-to-disk f403826
@kellrott kellrott Changing the JavaSerializer reset to occur every 1000 objects. 5eb2b7e
@kellrott kellrott Adding some comments. 44ec35a
@kellrott kellrott Merge branch 'master' into iterator-to-disk
Conflicts:
	core/src/main/scala/org/apache/spark/CacheManager.scala
56f71cd
@kellrott kellrott Simplifying StorageLevel checks 95c7f67
@kellrott kellrott Deleting temp output directory when done 0e6f808
@kellrott kellrott Fixing dumb mistake ("||" instead of "&&") 2eeda75
@kellrott kellrott Wrapping long line a6424ba
@kellrott kellrott Added check to make sure that streamed-to-dist RDD actually returns g…
…ood data in the LargeIteratorSuite
9df0276
@kellrott kellrott Removing un-needed semi-colons 31fe08e
@kellrott kellrott Removing rouge space 40fe1d7
@kellrott kellrott Making the Java ObjectStreamSerializer reset rate configurable by the…
… system variable 'spark.serializer.objectStreamReset', default is not 10000.
00c98e0
@kellrott kellrott Merge branch 'master' into iterator-to-disk 8644ee8
@kellrott kellrott Fixing the JavaSerializer to read from the SparkConf rather then the …
…System property.
656c33e
@kellrott kellrott Adding second putValues to BlockStore interface that accepts an Array…
…Buffer (rather then an Iterator).

This will allow BlockStores to have slightly different behaviors dependent on whether they get an
Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator
into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication.
0f28ec7
@kellrott kellrott Wrapping a few long lines 627a8b7
@kellrott kellrott Removing more un-needed array-buffer to iterator conversions c2fb430
@kellrott kellrott Streamlined the LargeIteratorSuite unit test. It should now run in ~2…
…5 seconds. Confirmed that it still crashes an unpatched copy of Spark.
16a4cea
@kellrott kellrott Moving the 'LargeIteratorSuite' to simply test persistance of iterato…
…rs. It doesn't try to invoke a OOM error any more
7ccc74b
@kellrott kellrott Adding docs for spark.serializer.objectStreamReset configuration f70d069
@kellrott kellrott Refactoring the BlockManager to replace the Either[Either[A,B]] usage…
…. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
2f684ea
@kellrott kellrott Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spar…
…k into iterator-to-disk

Conflicts:
	core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala
33ac390
@kellrott kellrott Merge ../incubator-spark into iterator-to-disk 8aa31cd
@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12935/

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...a/org/apache/spark/storage/FlatmapIteratorSuite.scala
+ * Java serialization system, the serializer caches objects to prevent writing redundant
+ * data, however that stops GC of those objects. By calling 'reset' you flush that
+ * info from the serializer, and allow old objects to be GC'd
+ */
+ test("Flatmap Iterator to Disk") {
+ val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
+ .setAppName("iterator_to_disk_test")
+ sc = new SparkContext(sconf)
+ try {
+ val expand_size = 100
+ val data = sc.parallelize( (1 to 5).toSeq ).
+ flatMap( x => Stream.range(0, expand_size) )
+ var persisted = data.persist(StorageLevel.DISK_ONLY)
+ println(persisted.count())
+ assert( persisted.count() == 500)
+ assert( persisted.filter( _==1 ).count() == 5 )
@mateiz
mateiz Mar 3, 2014

Code style is a little weird here and in other parts of the file -- don't leave spaces inside the parens, but do leave them around operators. So this should be:

assert(persisted.filter(_ == 1).count() == 5)

for example.

@mateiz
mateiz Mar 3, 2014

Also in ScalaTest, you can use === instead of == to get nicer error messages (e.g. "expected 5 but got 4" instead of "assertion failed: false").

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...a/org/apache/spark/storage/FlatmapIteratorSuite.scala
+ * info from the serializer, and allow old objects to be GC'd
+ */
+ test("Flatmap Iterator to Disk") {
+ val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
+ .setAppName("iterator_to_disk_test")
+ sc = new SparkContext(sconf)
+ try {
+ val expand_size = 100
+ val data = sc.parallelize( (1 to 5).toSeq ).
+ flatMap( x => Stream.range(0, expand_size) )
+ var persisted = data.persist(StorageLevel.DISK_ONLY)
+ println(persisted.count())
+ assert( persisted.count() == 500)
+ assert( persisted.filter( _==1 ).count() == 5 )
+ } catch {
+ case _ : OutOfMemoryError => assert(false)
@mateiz
mateiz Mar 3, 2014

Why are you catching OutOfMemoryError just letting the test suite fail? I don't think the test runner will easily recover from this. Maybe use a println or something if you want to leave a message saying this is an expected failure mode.

@mateiz mateiz commented on an outdated diff Mar 3, 2014
core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
- val elements = new ArrayBuffer[Any]
- elements ++= computedValues
- blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
+ if (storageLevel.useDisk && !storageLevel.useMemory) {
+ blockManager.put(key, computedValues, storageLevel, tellMaster = true)
+ return blockManager.get(key) match {
+ case Some(values) =>
+ return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ case None =>
+ logInfo("Failure to store %s".format(key))
+ return null
@mateiz
mateiz Mar 3, 2014

Why return null here instead of throwing an exception that explains the problem? It's going to create a NullPointerException later on, which will be confusing.

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...cala/org/apache/spark/serializer/JavaSerializer.scala
val objOut = new ObjectOutputStream(out)
- def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
+ var counter = 0
+ val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)
+
+ /* Calling reset to avoid memory leak:
+ * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+ * But only call it every 1000th time to avoid bloated serialization streams (when
@mateiz
mateiz Mar 3, 2014

Should say 10000th

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...ain/scala/org/apache/spark/storage/BlockManager.scala
@@ -534,8 +539,9 @@ private[spark] class BlockManager(
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
- val replicationFuture = if (data.isRight && level.replication > 1) {
- val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
+ val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
+ //Duplicate doesn't copy the bytes, just creates a wrapper
@mateiz
mateiz Mar 3, 2014

Add a space after //

@mateiz mateiz commented on the diff Mar 3, 2014
core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
- val elements = new ArrayBuffer[Any]
- elements ++= computedValues
- blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
+ if (storageLevel.useDisk && !storageLevel.useMemory) {
@mateiz
mateiz Mar 3, 2014

This is not the only condition where we want to do this. For example we might also want it for MEMORY_ONLY_SER, where the serialized data might fit in RAM but the ArrayBuffer of raw objects might not. (Especially if you set spark.rdd.compress to compress the serialized data.)

@mateiz
mateiz commented Mar 3, 2014

Hey Kyle, thanks for bringing this to the new repo. I looked through it and made a few comments. Another concern though is that it would be good to make this work for MEMORY_ONLY_SER storage as well, but note that that could be a little trickier. In particular, for in-memory storage, the block store might drop the block before you get to read it, which would then lead to an exception here. It might be possible to take the return value of MemoryStore.put() and pass that back from put() even if the memory store later decides to drop the block, but you'll have to deal with replication and stuff like that. If you want, we can open a separate issue for that and leave this for just the disk case. But in that case you should add a comment to the code explaining why this is only for disk.

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...ain/scala/org/apache/spark/storage/BlockManager.scala
- }
- } else {
- // Save directly to disk.
- // Don't get back the bytes unless we replicate them.
- val askForBytes = level.replication > 1
- val res = diskStore.putValues(blockId, values, level, askForBytes)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case _ =>
- }
+ if (level.useMemory) {
+ // Save it just to memory first, even if it also has useDisk set to true; we will
+ // drop it to disk later if the memory store can't hold it.
+ val res = data match {
+ case IteratorValues(values_i) =>
@mateiz
mateiz Mar 3, 2014

Rename values_i to just "iterator"; same with the ones below; same with values_a => "arrayBuffer" and value_bytes => "bytes".

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...ain/scala/org/apache/spark/storage/BlockManager.scala
+ value_bytes.rewind();
+ memoryStore.putBytes(blockId, value_bytes, level)
+ }
+ }
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
+ case Left(newIterator) => valuesAfterPut = newIterator
+ }
+ } else {
+ // Save directly to disk.
+ // Don't get back the bytes unless we replicate them.
+ val askForBytes = level.replication > 1
+
+ val res = data match {
+ case IteratorValues(values_i) =>
@mateiz
mateiz Mar 3, 2014

Ditto on the variable names here

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...c/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -52,11 +52,21 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
+ return PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean)
+ : PutResult = {
@mateiz
mateiz Mar 3, 2014

The formatting of this parameter list is wrong (sorry, not sure why Jenkins is not catching this); the proper formatting should be

  override def putValues(
      blockId: BlockId,
      values: ArrayBuffer[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = {
    ...
  }

See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide for some examples

@mateiz mateiz commented on an outdated diff Mar 3, 2014
...main/scala/org/apache/spark/storage/MemoryStore.scala
} else {
tryToPut(blockId, bytes, bytes.limit, false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean)
+ : PutResult = {
@mateiz
mateiz Mar 3, 2014

Same thing on formatting

@kellrott
kellrott commented Mar 3, 2014

Thank you for the notes. I'll start working on fixing things.
I'd like to keep this patch 'simple', and limit the scope to DISK_ONLY, and get it accepted before thinking about MEMORY_ONLY_SER.

@mateiz
mateiz commented Mar 4, 2014

Alright, sounds good. Looking forward to it.

@andrewor14 andrewor14 commented on the diff Mar 4, 2014
core/src/main/scala/org/apache/spark/CacheManager.scala
- elements.iterator.asInstanceOf[Iterator[T]]
+ if (storageLevel.useDisk && !storageLevel.useMemory) {
+ blockManager.put(key, computedValues, storageLevel, tellMaster = true)
+ return blockManager.get(key) match {
+ case Some(values) =>
+ return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ case None =>
+ logInfo("Failure to store %s".format(key))
+ return null
+ }
+ } else {
+ val elements = new ArrayBuffer[Any]
+ elements ++= computedValues
+ blockManager.put(key, elements, storageLevel, tellMaster = true)
+ return elements.iterator.asInstanceOf[Iterator[T]]
+ }
@andrewor14
andrewor14 Mar 4, 2014

Is there any reason why we still want to keep around the legacy case? It seems that the IteratorValues and ArrayBufferValues are handled in very similar ways downstream (e.g. in BlockManager.doPut, and memoryStore.putValues), and it would be nice if we could somehow merge these two code paths into a simpler one.

The other thing is that computeValues in L71 is already an iterator anyway, so we can just feed this directly into BlockManager. The existing way of making it an ArrayBuffer arbitrarily and back into an Iterator seems a little unnecessary.

(That said, there might be a performance reason that I haven't thought of.)

@mateiz
mateiz Mar 4, 2014

Yes, performance and correctness are actually both reasons. The code path for disk first writes the data to disk and then has to read and deserialize it from there, which is slow. Also, if you used the memory store in the same way, the store might drop it before you have a chance to call get(). See my comments on the main discussion.

@andrewor14
andrewor14 Mar 4, 2014

Even if we use the same code path for say MEM_ONLY, we won't actually write the data to disk since we also pass the storage level to BlockManager along with the data.

But I see your second point about possibly dropping a block before we read it. That does seem to prevent us from merging the two cases.

@andrewor14 andrewor14 commented on an outdated diff Mar 4, 2014
...cala/org/apache/spark/serializer/JavaSerializer.scala
@@ -23,9 +23,27 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.ByteBufferInputStream
-private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+private[spark] class JavaSerializationStream(out: OutputStream,
+ conf: SparkConf) extends SerializationStream {
@andrewor14
andrewor14 Mar 4, 2014

nit: break line on "extends" instead

@andrewor14 andrewor14 commented on an outdated diff Mar 4, 2014
...cala/org/apache/spark/serializer/JavaSerializer.scala
val objOut = new ObjectOutputStream(out)
- def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
+ var counter = 0
+ val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)
+
+ /* Calling reset to avoid memory leak:
@andrewor14
andrewor14 Mar 4, 2014

nit: use java doc format /** comment */ rather than scala doc

@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12975/

@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12985/

@kellrott
kellrott commented Mar 6, 2014

I think I've covered all the formatting requests. Any other issues?

@pwendell
pwendell commented Mar 6, 2014

Thanks @kellrott for this patch - sorry it took us a long time to review it. I'm going to merge this now.

@pwendell
pwendell commented Mar 6, 2014

I've created SPARK-1201 (https://spark-project.atlassian.net/browse/SPARK-1201) to cover optimizations in cases other than DISK_ONLY.

@asfgit asfgit pushed a commit that closed this pull request Mar 6, 2014
@kellrott kellrott SPARK-942: Do not materialize partitions when DISK_ONLY storage level…
… is used

This is a port of a pull request original targeted at incubator-spark: https://github.com/apache/incubator-spark/pull/180

Essentially if a user returns a generative iterator (from a flatMap operation), when trying to persist the data, Spark would first unroll the iterator into an ArrayBuffer, and then try to figure out if it could store the data. In cases where the user provided an iterator that generated more data then available memory, this would case a crash. With this patch, if the user requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is inputed into the serializer.

To do this, two changes where made:
1) The type of the 'values' argument in the putValues method of the BlockStore interface was changed from ArrayBuffer to Iterator (and all code interfacing with this method was modified to connect correctly.
2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 1000 objects. This was done because the ObjectOutputStream caches objects (thus preventing them from being GC'd) to write more compact serialization. If reset is never called, eventually the memory fills up, if it is called too often then the serialization streams become much larger because of redundant class descriptions.

Author: Kyle Ellrott <kellrott@gmail.com>

Closes #50 from kellrott/iterator-to-disk and squashes the following commits:

9ef7cb8 [Kyle Ellrott] Fixing formatting issues.
60e0c57 [Kyle Ellrott] Fixing issues (formatting, variable names, etc.) from review comments
8aa31cd [Kyle Ellrott] Merge ../incubator-spark into iterator-to-disk
33ac390 [Kyle Ellrott] Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into iterator-to-disk
2f684ea [Kyle Ellrott] Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
f70d069 [Kyle Ellrott] Adding docs for spark.serializer.objectStreamReset configuration
7ccc74b [Kyle Ellrott] Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more
16a4cea [Kyle Ellrott] Streamlined the LargeIteratorSuite unit test. It should now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of Spark.
c2fb430 [Kyle Ellrott] Removing more un-needed array-buffer to iterator conversions
627a8b7 [Kyle Ellrott] Wrapping a few long lines
0f28ec7 [Kyle Ellrott] Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication.
656c33e [Kyle Ellrott] Fixing the JavaSerializer to read from the SparkConf rather then the System property.
8644ee8 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
00c98e0 [Kyle Ellrott] Making the Java ObjectStreamSerializer reset rate configurable by the system variable 'spark.serializer.objectStreamReset', default is not 10000.
40fe1d7 [Kyle Ellrott] Removing rouge space
31fe08e [Kyle Ellrott] Removing un-needed semi-colons
9df0276 [Kyle Ellrott] Added check to make sure that streamed-to-dist RDD actually returns good data in the LargeIteratorSuite
a6424ba [Kyle Ellrott] Wrapping long line
2eeda75 [Kyle Ellrott] Fixing dumb mistake ("||" instead of "&&")
0e6f808 [Kyle Ellrott] Deleting temp output directory when done
95c7f67 [Kyle Ellrott] Simplifying StorageLevel checks
56f71cd [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
44ec35a [Kyle Ellrott] Adding some comments.
5eb2b7e [Kyle Ellrott] Changing the JavaSerializer reset to occur every 1000 objects.
f403826 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
81d670c [Kyle Ellrott] Adding unit test for straight to disk iterator methods.
d32992f [Kyle Ellrott] Merge remote-tracking branch 'origin/master' into iterator-to-disk
cac1fad [Kyle Ellrott] Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer objects. This was previously done higher up the stack.
efe1102 [Kyle Ellrott] Changing CacheManager and BlockManager to pass iterators directly to the serializer when a 'DISK_ONLY' persist is called. This is in response to SPARK-942.
40566e1
@asfgit asfgit closed this in 40566e1 Mar 6, 2014
@CrazyJvm CrazyJvm added a commit to CrazyJvm/spark that referenced this pull request Jun 1, 2014
@kellrott kellrott SPARK-942: Do not materialize partitions when DISK_ONLY storage level…
… is used

This is a port of a pull request original targeted at incubator-spark: https://github.com/apache/incubator-spark/pull/180

Essentially if a user returns a generative iterator (from a flatMap operation), when trying to persist the data, Spark would first unroll the iterator into an ArrayBuffer, and then try to figure out if it could store the data. In cases where the user provided an iterator that generated more data then available memory, this would case a crash. With this patch, if the user requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is inputed into the serializer.

To do this, two changes where made:
1) The type of the 'values' argument in the putValues method of the BlockStore interface was changed from ArrayBuffer to Iterator (and all code interfacing with this method was modified to connect correctly.
2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 1000 objects. This was done because the ObjectOutputStream caches objects (thus preventing them from being GC'd) to write more compact serialization. If reset is never called, eventually the memory fills up, if it is called too often then the serialization streams become much larger because of redundant class descriptions.

Author: Kyle Ellrott <kellrott@gmail.com>

Closes #50 from kellrott/iterator-to-disk and squashes the following commits:

9ef7cb8 [Kyle Ellrott] Fixing formatting issues.
60e0c57 [Kyle Ellrott] Fixing issues (formatting, variable names, etc.) from review comments
8aa31cd [Kyle Ellrott] Merge ../incubator-spark into iterator-to-disk
33ac390 [Kyle Ellrott] Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into iterator-to-disk
2f684ea [Kyle Ellrott] Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
f70d069 [Kyle Ellrott] Adding docs for spark.serializer.objectStreamReset configuration
7ccc74b [Kyle Ellrott] Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more
16a4cea [Kyle Ellrott] Streamlined the LargeIteratorSuite unit test. It should now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of Spark.
c2fb430 [Kyle Ellrott] Removing more un-needed array-buffer to iterator conversions
627a8b7 [Kyle Ellrott] Wrapping a few long lines
0f28ec7 [Kyle Ellrott] Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication.
656c33e [Kyle Ellrott] Fixing the JavaSerializer to read from the SparkConf rather then the System property.
8644ee8 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
00c98e0 [Kyle Ellrott] Making the Java ObjectStreamSerializer reset rate configurable by the system variable 'spark.serializer.objectStreamReset', default is not 10000.
40fe1d7 [Kyle Ellrott] Removing rouge space
31fe08e [Kyle Ellrott] Removing un-needed semi-colons
9df0276 [Kyle Ellrott] Added check to make sure that streamed-to-dist RDD actually returns good data in the LargeIteratorSuite
a6424ba [Kyle Ellrott] Wrapping long line
2eeda75 [Kyle Ellrott] Fixing dumb mistake ("||" instead of "&&")
0e6f808 [Kyle Ellrott] Deleting temp output directory when done
95c7f67 [Kyle Ellrott] Simplifying StorageLevel checks
56f71cd [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
44ec35a [Kyle Ellrott] Adding some comments.
5eb2b7e [Kyle Ellrott] Changing the JavaSerializer reset to occur every 1000 objects.
f403826 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
81d670c [Kyle Ellrott] Adding unit test for straight to disk iterator methods.
d32992f [Kyle Ellrott] Merge remote-tracking branch 'origin/master' into iterator-to-disk
cac1fad [Kyle Ellrott] Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer objects. This was previously done higher up the stack.
efe1102 [Kyle Ellrott] Changing CacheManager and BlockManager to pass iterators directly to the serializer when a 'DISK_ONLY' persist is called. This is in response to SPARK-942.
972ecf8
@vlad17 vlad17 pushed a commit to vlad17/spark that referenced this pull request Aug 23, 2016
@yhuai yhuai Add SPARK_DIST_CLASSPATH to LAUNCH_CLASSPATH
## What changes were proposed in this pull request?
In Databricks, `SPARK_DIST_CLASSPATH` are used for driver classpath and `SPARK_JARS_DIR` is empty. So, we need to add `SPARK_DIST_CLASSPATH` to the `LAUNCH_CLASSPATH`. We cannot remove `SPARK_JARS_DIR` because Spark unit tests are actually using it.

Author: Yin Huai <yhuai@databricks.com>

Closes #50 from yhuai/Add-SPARK_DIST_CLASSPATH-toLAUNCH_CLASSPATH.
6cd1cd1
@clockfly clockfly pushed a commit to clockfly/spark that referenced this pull request Aug 30, 2016
@yhuai yhuai Add SPARK_DIST_CLASSPATH to LAUNCH_CLASSPATH
## What changes were proposed in this pull request?
In Databricks, `SPARK_DIST_CLASSPATH` are used for driver classpath and `SPARK_JARS_DIR` is empty. So, we need to add `SPARK_DIST_CLASSPATH` to the `LAUNCH_CLASSPATH`. We cannot remove `SPARK_JARS_DIR` because Spark unit tests are actually using it.

Author: Yin Huai <yhuai@databricks.com>

Closes #50 from yhuai/Add-SPARK_DIST_CLASSPATH-toLAUNCH_CLASSPATH.
224fffe
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment