From 9c21aca0ea24836259a5c94931ed9c7831607192 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 1/3] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index f4bec589208be..23564a0ab8146 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1001,7 +1001,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From c82e3822751a461550906a6d78117633e7db4d1f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 2/3] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 23564a0ab8146..f4bec589208be 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1001,7 +1001,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From b37ec112e01880e3d67d81972bae33487763c742 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Fri, 23 Jun 2017 15:28:31 -0700 Subject: [PATCH 3/3] purge log in metadatalog of filesourcestream --- .../spark/sql/execution/streaming/FileStreamSource.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a9e64c640042a..06161d8cd5df7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -63,6 +63,9 @@ class FileStreamSource( new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath) private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L) + private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") + /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger @@ -256,8 +259,9 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { - // No-op for now; FileStreamSource currently garbage-collects files based on timestamp - // and the value of the maxFileAge parameter. + if (currentLogOffset > minBatchesToRetain) { + metadataLog.purge(currentLogOffset - minBatchesToRetain) + } } override def stop() {}