-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink #21559
Conversation
@jose-torres @brkyvz for review |
ok to test |
Jenkins add to whitelist |
jenkins add to whitelist |
} | ||
|
||
private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { | ||
if (rows.length > maxRows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also adding a check here to make sure maxRows >= 0. It shouldn't ever be negative, but doesn't hurt to safeguard.
Test build #91796 has finished for PR 21559 at commit
|
Test build #91799 has finished for PR 21559 at commit
|
numRows = 0 | ||
} | ||
|
||
private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd document that maxRows is the remaining row capacity, not the maximum row limit defined in the options. I got confused for a minute here.
numRows = 0 | ||
} | ||
|
||
private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this go in MemorySinkBase?
lgtm |
* Companion object to MemorySinkBase. | ||
*/ | ||
object MemorySinkBase { | ||
val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxRows
is sufficient
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra line
*/ | ||
def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = { | ||
val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT) | ||
if (maxRows >= 0) Some(maxRows) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to do if (maxRows >= 0) maxRows else Int.MaxValue - 10
We can't exceed runtime array max size anyway
@@ -81,22 +84,35 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB | |||
}.mkString("\n") | |||
} | |||
|
|||
def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { | |||
def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row], sinkCapacity: Option[Int]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: our style is more like
def write(
batchId: Long,
outputMode: OutputMode,
newRows: Array[Row],
sinkCapacity: Option[Int]): Unit = {
|
||
private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { | ||
if (rows.length > maxRows) { | ||
logWarning(s"Truncating batch $batchId to $maxRows rows") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does take behave with negative rows? Printing a warning message with negative values may be weird. I would also include the sink limit in the warning.
Test build #91861 has finished for PR 21559 at commit
|
Test build #91864 has finished for PR 21559 at commit
|
Test build #91868 has finished for PR 21559 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just two minor nits
* Gets the max number of rows a MemorySink should store. This number is based on the memory | ||
* sink row limit if it is set. If not, there is no limit. | ||
* @param options Options for writing from which we get the max rows option | ||
* @return The maximum number of rows a memorySink should store, or None for no limit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to update docs
sinkLimit: Int, | ||
batchId: Long): Array[Row] = { | ||
if (rows.length > batchLimit && batchLimit >= 0) { | ||
logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure if these sinks get used by Continuous processing too. If so I would rename batch
to trigger version
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece is shared by MemorySink and MemorySinkV2, and the MemorySinkV2 (continuous processing) sink still calls them batches.
Test build #91926 has finished for PR 21559 at commit
|
Thanks! Merging to master! |
…rySink Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded. Added new unit tests. Author: Mukul Murthy <mukul.murthy@databricks.com> Closes apache#21559 from mukulmurthy/SPARK-24525. Ref: LIHADOOP-48531 RB=1852593 G=superfriends-reviewers R=mshen,fli,latang,yezhou,zolin A=
What changes were proposed in this pull request?
Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded.
How was this patch tested?
Added new unit tests.