Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStore #11791

Closed
wants to merge 13 commits into from

Conversation

JoshRosen
Copy link
Contributor

When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements.

Instead, I think that we should incrementally serialize blocks while unrolling them.

A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case.

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53456 has finished for PR 11791 at commit 7dc3623.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 18, 2016

Test build #53496 has finished for PR 11791 at commit 5489748.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

/cc @rxin @andrewor14, this is the next most important patch to review towards off-heap caching. After these changes get in, we'll be able to use off-heap memory for the unroll memory in off-heap caching, greatly simplifying things. Without this change, the on-heap unroll array needs to be accounted properly even if the final cache destination is off-heap, making the caching more OOM-prone and complicating the accounting logic (since it then becomes different between the two modes).

@rxin
Copy link
Contributor

rxin commented Mar 19, 2016

Still WIP?

@JoshRosen JoshRosen changed the title [SPARK-13980][WIP] Incrementally serialize blocks while unrolling them in MemoryStore [SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStore Mar 21, 2016
@JoshRosen
Copy link
Contributor Author

This is no longer WIP and should be ready for review now.

@@ -129,10 +136,9 @@ private[spark] class MemoryStore(
* iterator or call `close()` on it in order to free the storage memory consumed by the
* partially-unrolled block.
*/
private[storage] def putIterator(
private[storage] def putIteratorAsValues(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it isn't obvious from the diff, the main change in this file is to split putIterator into two separate methods, putIteratorAsValues and putIteratorAsBytes.

It's possible that there's some opportunity to reduce code duplication here, but unless we can come up with an obvious and simple approach I would prefer to defer cleanup to followup patches.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53700 has finished for PR 11791 at commit a336c17.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53704 has finished for PR 11791 at commit a336c17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
unrollMemoryUsedByThisBlock += amountToRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont understand why you add this twice in some cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think this case is a mistake which might have been introduced while repairing a merge conflict. We should only increment this if keepUnrolling == true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has now been fixed.

@rxin
Copy link
Contributor

rxin commented Mar 22, 2016

cc @sameeragarwal

@SparkQA
Copy link

SparkQA commented Mar 22, 2016

Test build #53742 has finished for PR 11791 at commit 4976b74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 22, 2016

Test build #53741 has finished for PR 11791 at commit cec1f02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Doing a bit of refactoring now in order to make it easier to write proper unit tests for this. Therefore I'd hold off on the final review pass here for a little bit and review my SPARK-3000 patch instead.

@SparkQA
Copy link

SparkQA commented Mar 23, 2016

Test build #53957 has finished for PR 11791 at commit 768a8d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LogisticRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class NaiveBayesModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class KMeansModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class CountVectorizerModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class DCT(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class ElementwiseProduct(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable,
    • class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures, JavaMLReadable,
    • class IDF(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class IDFModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class MaxAbsScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class MinMaxScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class NGram(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class Normalizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class PolynomialExpansion(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable,
    • class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, HasSeed, JavaMLReadable,
    • class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class SQLTransformer(JavaTransformer, JavaMLReadable, JavaMLWritable):
    • class StandardScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class StandardScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class StringIndexer(JavaEstimator, HasInputCol, HasOutputCol, HasHandleInvalid, JavaMLReadable,
    • class StringIndexerModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class IndexToString(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class StopWordsRemover(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class VectorIndexer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class VectorIndexerModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class VectorSlicer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class Word2VecModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class PCA(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
    • class PCAModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaMLWritable):
    • class RFormulaModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class ChiSqSelector(JavaEstimator, HasFeaturesCol, HasOutputCol, HasLabelCol, JavaMLReadable,
    • class ChiSqSelectorModel(JavaModel, JavaMLReadable, JavaMLWritable):
    • class PipelineMLWriter(JavaMLWriter):
    • class Pipeline(Estimator, MLReadable, MLWritable):
    • class PipelineModelMLWriter(JavaMLWriter):
    • class PipelineModel(Model, MLReadable, MLWritable):
    • class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class LinearRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class IsotonicRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class AFTSurvivalRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):
    • class MLWriter(object):
    • class JavaMLWriter(MLWriter):
    • class JavaMLWritable(MLWritable):
    • class MLReader(object):
    • class JavaMLReader(MLReader):
    • class JavaMLReadable(MLReadable):
    • implicit class StringToColumn(val sc: StringContext)
    • class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T]
    • // the type in next() and we get a class cast exception. If we make that function return
    • class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
    • class StreamProgress(

@JoshRosen
Copy link
Contributor Author

Alright, just added a few more tests to MemoryStoreSuite to bump up the coverage of putBytesAsIterator() and fixed a problem leading to leaked unroll memory in PartiallySerializedResult.finishWritingToStream.

@SparkQA
Copy link

SparkQA commented Mar 24, 2016

Test build #54057 has finished for PR 11791 at commit 749df73.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 24, 2016

Test build #54070 has finished for PR 11791 at commit 749df73.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of the call to close? That can use more memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@nongli
Copy link
Contributor

nongli commented Mar 24, 2016

LGTM

} else {
iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about my previous comment about duplicate code, never mind. It can't actually be abstracted cleanly.

@andrewor14
Copy link
Contributor

Looks good.

@SparkQA
Copy link

SparkQA commented Mar 24, 2016

Test build #54094 has finished for PR 11791 at commit 749df73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Merging to master.

@asfgit asfgit closed this in fdd460f Mar 25, 2016
@JoshRosen JoshRosen deleted the serialize-incrementally branch August 29, 2016 19:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants