Skip to content

Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead#10740

Merged
maytasm merged 23 commits intoapache:masterfrom
maytasm:IMPLY-5567
Jan 27, 2021
Merged

Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead#10740
maytasm merged 23 commits intoapache:masterfrom
maytasm:IMPLY-5567

Conversation

@maytasm
Copy link
Contributor

@maytasm maytasm commented Jan 9, 2021

Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead

Description

Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead.
Assumption / Estimation made:

  • Estimated that each Sink object (not including the currHydrant and list of hydrant) has an overhead of 5000 bytes (Sink.ROUGH_OVERHEAD_PER_SINK) each. This estimation is based on actual heap dumps from actual ingestions. i.e. one sink with 1000 rows and 1000 sink with 1 row (same rows values) will now result in different byte in memory usage calculation.
  • Estimated that each 'hydrant' of memory mapped segment (after persisted) has an overhead of the formula Integer.BYTES + (4 * Short.BYTES) + 1000 + (NumColumnsOfHydrant * ColumnHolder.ROUGH_OVERHEAD_PER_COLUMN_HOLDER) bytes (where ColumnHolder.ROUGH_OVERHEAD_PER_COLUMN_HOLDER=1000 bytes). i.e. one sink with one hydrant having 1000 rows and one sink with 1000 hydrant (each with one row) (same rows values) will now result in different byte in memory usage calculation.

Both of the above can cause OOM during ingestion when we have a lot of sink and a lot of persisting. More specifically, the problem is serious when each persist happens across a huge number of sinks. Hence, the OOM usually occurs with big time range and/or small segmentGranularity couple with unsorted time data).

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@suneet-s
Copy link
Contributor

[ERROR] Failures: 
[ERROR] org.apache.druid.segment.realtime.appenderator.AppenderatorTest.testMaxBytesInMemory(org.apache.druid.segment.realtime.appenderator.AppenderatorTest)
[ERROR]   Run 1: AppenderatorTest.testMaxBytesInMemory:204 expected:<0> but was:<364>

Test failure looks legit

@maytasm
Copy link
Contributor Author

maytasm commented Jan 12, 2021

[ERROR] Failures: 
[ERROR] org.apache.druid.segment.realtime.appenderator.AppenderatorTest.testMaxBytesInMemory(org.apache.druid.segment.realtime.appenderator.AppenderatorTest)
[ERROR]   Run 1: AppenderatorTest.testMaxBytesInMemory:204 expected:<0> but was:<364>

Test failure looks legit

fixed.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems nice to be able to avoid out of memory exceptions 👍

can we add some more tests around ensuring the calculations and error conditions are well behaved in a wider variety of scenarios (lots of sinks, lots of hydrants, lots of columns, etc)?

public interface ColumnHolder
{
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
int ROUGH_OVERHEAD_PER_COLUMN_HOLDER = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since this is only used by the calculations in AppenderatorImpl, i would maybe be in favor of this just living in AppenderatorImpl along with the other estimation constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +633 to +642
if (bytesInMemoryBeforePersist - bytesPersisted > maxBytesTuningConfig) {
// We are still over maxBytesTuningConfig even after persisting.
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
log.makeAlert("Persist no longer free up memory")
.addData("dataSource", schema.getDataSource())
.addData("numSinks", sinks.size())
.addData("numHydrantsAcrossAllSinks", sinks.values().stream().mapToInt(Iterables::size).sum())
.emit();
throw new RuntimeException("Ingestion run out of available memory as persist can no longer free up any memory.");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should clarify that the task is using too much heap memory? I also suggest moving numSinks and numHydrantsAcrossAllSinks into the error messaging and sharing the error message between the alert and the exception that is thrown after so that it is a bit more visible. I also wonder if we should include the total row count to aid in determining if perhaps overly aggressive heap memory limits caused a large number of persists, or if it is just a large amount of data.

Thinking out loud (and not something we really need to worry about in this PR), I wonder if we should perhaps consider more targeted error messaging. It seems like there are at least 4 ways we end up here:

  • too many intervals/sinks
  • too many persisted segments in a reasonable number of sinks and rows and columns (too much data)
  • too many columns in a reasonable number of segments and sinks and rows
  • too many persisted segments in a reasonable number of sinks and columns but a unreasonable/small number of total rows (aggressive settings problem)

The trick I guess would be deciding what is "reasonable" to adjust the error messaging accordingly to suggest the source of the problem, though is that actually useful? The cluster operator can infer any of these situations this from whether the numSinks and numHydrantsAcrossAllSinks and totalRows are large numbers or not, so maybe it isn't worth the extra messaging, since there isn't a distinct or obvious way to mitigate any of these conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added more details to error message. Moved stats into error message. Added totalRow stat.


// bytesCurrentlyInMemory can change while persisting due to concurrent ingestion.
// Hence, we use bytesInMemoryBeforePersist to determine the change of this persist
if (bytesInMemoryBeforePersist - bytesPersisted > maxBytesTuningConfig) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should cut this off before we go over the limit. The heap overhead of the persisted data is going to cause additional persists to happen ever more frequently, so this seems like it will behave in a thrashing manner as it approaches the limit (though at least there is a limit to cut it off when it reaches the point of persisting every row). I don't know what an ideal cutoff point would be.

This sort of makes me wonder if including this "resident heap" calculation into the bytes in memory persist triggers should be opt in due to the potential to change behavior, but i really would prefer to not introduce another config option...

It seems ideal for us to gracefully detect conditions that are likely to cause the task to run out of memory and fail instead of just allowing it to occur, especially when using indexers instead of middle managers, so I just want to make sure we can make it well behaved.

Copy link
Contributor Author

@maytasm maytasm Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't know what an ideal cutoff point would be. I think the thrashing should only happens for a short duration assuming the data has "many intervals/sinks" or "many columns" characteristic as the overhead increase after each persist should increase quickly.

I added more tests to make sure it's well behaved.

I didn't add any config/opt in since most likely anything that will fail because of this would have fail because of OOM too.

Added config in the ingestionSpec to skip this new check

@maytasm
Copy link
Contributor Author

maytasm commented Jan 14, 2021

this seems nice to be able to avoid out of memory exceptions 👍

can we add some more tests around ensuring the calculations and error conditions are well behaved in a wider variety of scenarios (lots of sinks, lots of hydrants, lots of columns, etc)?

Modified the tests from

  1. one sink -> ingest -> check before persist -> ingest -> causes persist -> check
  2. multiple sink -> ingest to sink a -> check before persist -> ingest to sink b-> causes persist -> check
    to...
  3. one sink -> ingest -> check before persist -> ingest -> causes persist -> check -> ingest more -> check -> ingest more cause persist -> check
  4. multiple sink -> ingest to sink a -> check before persist -> ingest to sink b-> causes persist -> check -> ingest more -> check -> ingest more cause persist -> check

now we have ingestion post the first persist and also more ingestion causing the second persist (resulting in multiple mmap hydrants in same sink)

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, just for fun I also pulled the branch and tested and it appears to work as advertised 👍

Someday, I hope that some hero is going to do the incredibly tedious work of making builders for all of these ingestion tuning types and switching the tests to use them so these sorts of changes of adding/removing options need not be so disruptive..


# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
SERVICE_DRUID_JAVA_OPTS=-server -Xmx1g -Xms1g -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is integration tests, should we leave xms lower so that it doesn't use the full 1g unless it needs it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that flaky test failures in Indexer-based IT are due to the low/insufficient memory. I have seen failures from ingestion tasks never finishing with no error/exception message...ingestion task just disappeared. One reason I can think of is OOM in the ingestion. Hence, bump this to hopefully eliminate flaky test failure in Indexer IT. This is not directly related to this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to -Xmx1g -Xms512mb

@@ -721,6 +721,7 @@ is:
|type|Each ingestion method has its own tuning type code. You must specify the type code that matches your ingestion method. Common options are `index`, `hadoop`, `kafka`, and `kinesis`.||
|maxRowsInMemory|The maximum number of records to store in memory before persisting to disk. Note that this is the number of rows post-rollup, and so it may not be equal to the number of input records. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first).|`1000000`|
|maxBytesInMemory|The maximum aggregate size of records, in bytes, to store in the JVM heap before persisting. This is based on a rough estimate of memory usage. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first).<br /><br />Setting maxBytesInMemory to -1 disables this check, meaning Druid will rely entirely on maxRowsInMemory to control memory usage. Setting it to zero means the default value will be used (one-sixth of JVM heap size).<br /><br />Note that the estimate of memory usage is designed to be an overestimate, and can be especially high when using complex ingest-time aggregators, including sketches. If this causes your indexing workloads to persist to disk too often, you can set maxBytesInMemory to -1 and rely on maxRowsInMemory instead.|One-sixth of max JVM heap size|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably update maxBytesInMemory to indicate that it also estimates heap usage of the intermediary persisted artifacts, and that the task can fail when this estimated overhead approaches maxBytesInMemory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +646 to +661
String errorMsg = StringUtils.format("Failing task as task uses up too much heap memory. "
+ "Persist can no longer free up memory. Objects that "
+ "cannot be freed up from intermediate persist include Sinks, Memory Mapped Hydrants, "
+ "and other overhead created while ingesting/pervious persistings. "
+ "This check, along with calculation and consideration of overhead objects' "
+ "memory can be disabled by setting skipBytesInMemoryOverheadCheck to true "
+ "in the tuningConfig of the ingestionSpec "
+ "(note that doing so can result in OOME). "
+ "numSinks[%d] numHydrantsAcrossAllSinks[%d] totalRows[%d]",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
getTotalRowCount());
log.makeAlert(errorMsg)
.addData("dataSource", schema.getDataSource())
.emit();
throw new RuntimeException(errorMsg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I walk back my previous suggestion about consolidating alert and exception messages, this is maybe a bit wordy for an alert, but I like including the expanded description in the task log. How about making the alert message a subset of the exception message, something like this:

Suggested change
String errorMsg = StringUtils.format("Failing task as task uses up too much heap memory. "
+ "Persist can no longer free up memory. Objects that "
+ "cannot be freed up from intermediate persist include Sinks, Memory Mapped Hydrants, "
+ "and other overhead created while ingesting/pervious persistings. "
+ "This check, along with calculation and consideration of overhead objects' "
+ "memory can be disabled by setting skipBytesInMemoryOverheadCheck to true "
+ "in the tuningConfig of the ingestionSpec "
+ "(note that doing so can result in OOME). "
+ "numSinks[%d] numHydrantsAcrossAllSinks[%d] totalRows[%d]",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
getTotalRowCount());
log.makeAlert(errorMsg)
.addData("dataSource", schema.getDataSource())
.emit();
throw new RuntimeException(errorMsg);
final String alertMessage = StringUtils.format(
"Task has exceeded safe estimated heap usage limits, failing "
+ "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
getTotalRowCount()
);
final String errorMessage = StringUtils.format(
"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
+ "great to have enough space to process additional input rows. This check, along with metering the overhead "
+ "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting "
+ "'skipBytesInMemoryOverheadCheck' to 'true' or increasing 'maxBytesInMemory' (note that doing so might "
+ "allow the task to naturally encounter a 'java.lang.OutOfMemoryError').",
alertMessage
);
log.makeAlert(alertMessage)
.addData("dataSource", schema.getDataSource())
.emit();
throw new RuntimeException(errorMessage);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Reworded a little since increasing 'maxBytesInMemory' does not disabled the new check.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm 🤘, up to you if you want to further change error messaging or just merge

Comment on lines +658 to +659
+ "a 'java.lang.OutOfMemoryError'). Otherwise, user can increase 'maxBytesInMemory' to allocate more heap "
+ "which will allow more intermediary segment persists.",
Copy link
Member

@clintropolis clintropolis Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: technically i guess if you explicitly set maxBytesInMemory it isn't really coupled to how much heap is allocated to the jvm process, maybe something like:

Alternatively, maxBytesInMemory can be increased which will cause an increase in heap footprint, but will allow for more intermediary segment persists to occur before reaching this condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@maytasm maytasm merged commit a46d561 into apache:master Jan 27, 2021
@maytasm maytasm deleted the IMPLY-5567 branch January 27, 2021 08:35
@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants