-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bound memory utilization for dynamic partitioning (i.e. memory growth is constant) #11294
Conversation
77f4c49
to
a3bd5dd
Compare
…had to put the sink back in sinks in mergeandpush since the persistent data needs to be dropped and the sink is required for that
…ally after sink has been merged
e423a99
to
0312763
Compare
metrics.incrementNumPersists(); | ||
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); | ||
persistStopwatch.stop(); | ||
} | ||
|
||
final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS); | ||
metrics.incrementPersistBackPressureMillis(startDelay); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will report a wrong metric because there is no start delay now. I think we don't have to report it since we don't use the executor anymore. You can remove runExecStopwatch
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -764,7 +691,7 @@ private DataSegment mergeAndPush( | |||
SinkMetadata sm = sinksMetadata.get(identifier); | |||
if (sm == null) { | |||
log.warn("Sink metadata not found just before merge for identifier [%s]", identifier); | |||
} else if (numHydrants != sinksMetadata.get(identifier).getNumHydrants()) { | |||
} else if (numHydrants != sm.getNumHydrants()) { | |||
throw new ISE("Number of restored hydrants[%d] for identifier[%s] does not match expected value[%d]", | |||
numHydrants, identifier, sinksMetadata.get(identifier).getNumHydrants()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use sm
too because sinksMetadata.get(identifier)
can return null if drop()
is called for some reason after you get `sm above.
numHydrants, identifier, sinksMetadata.get(identifier).getNumHydrants()); | |
numHydrants, identifier, sm.getNumHydrants()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -17,7 +17,7 @@ | |||
* under the License. | |||
*/ | |||
|
|||
package org.apache.druid.segment.realtime.appenderator; | |||
package org.apache.druid.indexing.appenderator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way the coverage bot works currently is running all tests and finding the lines and branches in the corresponding classes to those tests. One requirement is that the target class to test and its test class must be in the same package. So, I would suggest not moving this class if possible because you will need to move lots of other classes along with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class BatchAppenderatorTester
that must be moved also if we want to move the test classes to the server
module is very difficult to move because it uses the constructor for class IndexTask.IndexTuningConfig
which is not available in the server module. Moving that class will demand moving a lot of classes that do not make sense in the server
module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am excluding the class again.. I see no easy way to moving them all to the same package. We are fighting against a tool (coverage) primarily, secondarily against module design (i.e. "server" has some batch stuff and it should only have "realtime" stuff). Resolving the latter, which takes non-trivial effort and should go in another ticket, will also resolve the former.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed class from exclusion after finding a way to move the test classes to the server module.
@@ -172,7 +172,7 @@ SegmentWithState getAppendingSegment() | |||
/** | |||
* Allocated segments for a sequence | |||
*/ | |||
static class SegmentsForSequence | |||
public static class SegmentsForSequence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchAppenderatorDriver
is in the same package as BaseAppenderatorDriver
. I assume you meant BatchAppenderatorDriverTest
which is the class you moved to another package. As I said in my other comment, the class to test and its corresponding test class should be in the same package to help the test coverage bot. I suggested to not move the package of BatchAppenderatorDriverTest
and thus you will not need to change this access modifier either. Same for other access modifier changes in this class.
FireDepartmentMetrics metrics, | ||
DataSegmentPusher dataSegmentPusher, | ||
ObjectMapper objectMapper, | ||
@Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter must be always null per the argument checker below.. Can we just remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* This constructor allows the caller to provide its own SinkQuerySegmentWalker. | ||
* <p> | ||
* The sinkTimeline is set to the sink timeline of the provided SinkQuerySegmentWalker. | ||
* If the SinkQuerySegmentWalker is null, a new sink timeline is initialized. | ||
* <p> | ||
* It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple | ||
* Appenderators. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This javadoc is no longer correct. I think you can simply delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier); | ||
} | ||
catch (IOException e) { | ||
throw new ISE(e, "Failed to retrieve sinks for identifier", identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new ISE(e, "Failed to retrieve sinks for identifier", identifier); | |
throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
(dir, fileName) -> !(Ints.tryParse(fileName) == null) | ||
); | ||
if (sinkFiles == null) { | ||
throw new ISE("Problem reading persisted sinks in path", identifierPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new ISE("Problem reading persisted sinks in path", identifierPath); | |
throw new ISE("Problem reading persisted sinks in path[%s]", identifierPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// to pull it from there.... | ||
SinkMetadata sm = sinksMetadata.get(identifier); | ||
if (sm == null) { | ||
throw new ISE("Sink must not be null for identifier when persisting hydrant", identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new ISE("Sink must not be null for identifier when persisting hydrant", identifier); | |
throw new ISE("Sink must not be null for identifier[%s] when persisting hydrant", identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
This pull request introduces 2 alerts when merging 62e4ac3 into a9c4b47 - view on LGTM.com new alerts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, but please remove unnecessary code and fix the typo before this PR is merged.
|
||
int numHydrants = 0; | ||
for (FireHydrant hydrant : sink) { | ||
synchronized (hydrant) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec identifier) | ||
{ | ||
synchronized (indexToPersist) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// the invariant of exactly one, always swappable, sink with exactly one unpersisted hydrant must hold | ||
int totalHydrantsForSink = hydrants.size(); | ||
if (totalHydrantsForSink != 1) { | ||
throw new ISE("There should be only onw hydrant for identifier[%s] but there are[%s]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new ISE("There should be only onw hydrant for identifier[%s] but there are[%s]", | |
throw new ISE("There should be only one hydrant for identifier[%s] but there are[%s]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private DataSegment mergeAndPush( | ||
final SegmentIdWithShardSpec identifier, | ||
final Sink sink, | ||
final boolean useUniquePath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useUniquePath
is always false so you can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// copied from druid-indexing as is for testing since it is not accessible from server module, | ||
// we could simplify since not all its functionality is being used | ||
// but leaving as is, it could be useful later | ||
private static class IndexTuningConfig implements AppenderatorConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this change in my last review. Please clean up all the codes unused here in this class because there is no reason to keep them. It should really be a simple POJO. We can add some codes back if we really need them later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned up but most of the code is setting defaults (when nulls are passed to the constructor) that are necessary for appenderator to function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree that it's already simple enough. Why is it Jackson-serializable? It doesn't seem to be used in any test. Besides this, I also see lots of methods and parameters deprecated, not in use at all, or not used in any test. I can leave comments on them to help you identify them if you want. I don't think we should keep this code if the reason is just that we might need them in the future. It will be easy to add them back if we need, or we can do even better than now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change seems to be really held back by the baggage of using the Appenderator
interface, which just really doesn't seem appropriate in the long term if we are going to have dedicated batch processing. The obvious things off the top of my head:
- don't need to be a
QuerySegmentWalker
- maybe we don't need to persist everything when any hydrant overflows since there is no checkpointing, but could either only persist the full guy, or use some sort of threshold based persist to prevent tiny segments
But, since i think it should still be an improvement for batch processing, and doesn't seem to strictly make doing the more dramatic change any harder, i think it lgtm overall 👍
* for functionality, depending in the field that is used. More info about the | ||
* fields is annotated as comments in the class | ||
*/ | ||
private static class SinkMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe put inline class at end of file so its not in middle of private fields declaration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.stream.Collectors; | ||
|
||
public class BatchAppenderator implements Appenderator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please add javadocs clearly documented the expected concurrency model of this thing? Comparing with the stream appenderator, things like persistHydrant
are synchronized, but that method is not here, which makes me think it should be only called by a single thread.
However, there are things like concurrent maps and atomic integer counters in use, which makes me curious if there is some concurrency, and what might be affected. If concurrency is never expected, please remove the concurrent types because they are confusing.
Such a javadoc would save myself and any others from having to dig deep to try and trace this out for ourselves, so it would be helpful to clarify the concurrency model and if this diverges anywhere from the base Appenderator
contractor or not.
Also, comparing side by side with StreamAppenderator
, it looks like this class shares some lineage with it, it might be worth describing the differences in the javadocs here as well and linking to it (StreamingAppenderator
would be nice too, but it didn't previously have javadocs so they are probably ok to add later...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added java doc and removed remaining concurrency constructs
docs/configuration/index.md
Outdated
@@ -1334,7 +1334,7 @@ Additional peon configs include: | |||
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| | |||
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| | |||
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| | |||
|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| | |||
|`druid.indexer.task.batchFallback`|If false, native batch ingestion will use memory optimized code. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this isn't a very intuitive name, how about something like druid.indexer.task.useDedicatedBatchProcessing
, defaulting to true
which is a hassle because it inverts the config.
If true, native batch ingestion will use dedicated, memory optimized processing. When set to false, native batch indexing will revert to its legacy mode, which shares the same code-path as streaming ingestion but has a higher memory footprint.
If you would rather not invert usages because I admit it might be sort of painful to change, I guess something like useLegacyBatchProcessing
or similar could also work and still allow defaulting to false and also be more clear about the role of the config when encountered in the properties file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally left that variable with a vague name...if the previous that I just removed would have had a vague name like that then I could have just re-used it and maybe edit the description in the docs. I feel this name is fine (I accept it is vague--which is intentional) since it is really there for an exceptional situation. We want to get rid of this asap, potentially before the next open source release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm making the 0.22 branch likely in the next week, so it seems to me that the flag must exist to provide a way to revert to 0.21 behavior for at least 1 release cycle I think. batchMemoryMappedIndex
is not in 0.21, so has never been released.
Since this flag will be release, I still think I assert that it should have a better name, inverted or not, I don't see a good argument to leave it intentionally vague.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to useLegacyBatchProcessing
public static final int ROUGH_OVERHEAD_PER_SINK = 5000; | ||
// Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps | ||
public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these seem dupes of constants in StreamAppenderator
, should it just use them directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to use stuff from StreamAppenderator but I could move them to the Appenderator interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left them as is since I don't think it is a good idea to start coupling back either to the interface or StreamAppenderator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems better to me to leave them separate since the overlord per sink and hydrant can be different from the stream ingestion in the future. Maybe even for now, since the sink can have up to only one hydrant before it is persisted, so memory pressure per sink could be different.
* physical segments (i.e. hydrants) do not need to memory map their persisted | ||
* files. In this case, the code will avoid memory mapping them thus ameliorating the occurance | ||
* of OOMs. | ||
*/ | ||
private final boolean isRealTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like this should just be removed? Since this functionality hasn't been released, should it still be here? I guess it could be removed in a follow-up, but I guess it also means there is no way to turn this functionality off for batch tasks in case closing the segments in StreamAppenderator
itself has a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used by the fall back flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not really though, when the fallback is set to true, StreamAppenderator
is made with isRealtime hard coded to false, instead of controlled by a flag as was introduced in #11123. This means there is no way to revert the behavior of that PR since it isn't operator controllable anymore. Rather than introduce a 2nd flag, I strongly think we should consider removing the isRealtime from StreamAppenderator
before 0.22 since the behavior was previously unreleased and now there is no way to not use it (I don't think there is enough time to have no setting at all and always use BatchAppenderator
, so that flag will still need to exist to choose between them).
import java.util.Objects; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
|
||
public class BatchAppenderatorTester implements AutoCloseable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the difference between this and StreamAppenderatorTester
, Appenderators.createRealtime
vs Appenderators.createOffline
and different tuning config? Also, does it need these json annotations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to create different tester classes for stream & batch so as not to couple them together. I will remove the json annotations.
// Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment, | ||
// Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.) | ||
int total; | ||
total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems like a lot of lines for a constant. if i understand correctly there is only 1 hydrant per sink, would it just make sense to factor this into the sink calculation?
Also the comment doesn't seem relevant and should be fixed or removed
…cts in batch appenderator, reneame feature flag, remove real time flag stuff from stream appenderator, etc.)
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; | ||
|
||
@Nullable | ||
private static PartitionsSpec getPartitionsSpec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartitionsSpec in this class is not in use any test. Please stop copying the business logic to the test. All tests must pass a proper partitionsSpec if they test partitioning-related behavior unless they verify the default partitionsSpec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
Long maxBytesInMemory, | ||
Boolean skipBytesInMemoryOverheadCheck, | ||
Long maxTotalRows, | ||
Integer rowFlushBoundary_forBackCompatibility, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many parameters including this are deprecated in IndexTuningConfig
. They only exist in IndexTuningConfig
for compatibility. It doesn't seem reasonable to copy them to here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean config as much as I could, take a look.
|
||
@Deprecated | ||
@Nullable | ||
public Integer getNumShards() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you want to add a deprecated method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
*/ | ||
@Nullable | ||
@Override | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
*/ | ||
@Override | ||
@Nullable | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
} | ||
|
||
|
||
public long getAwaitSegmentAvailabilityTimeoutMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
public boolean isLogParseExceptions() | ||
{ | ||
return logParseExceptions; | ||
} | ||
|
||
public int getMaxParseExceptions() | ||
{ | ||
return maxParseExceptions; | ||
} | ||
|
||
public int getMaxSavedParseExceptions() | ||
{ | ||
return maxSavedParseExceptions; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods are not in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
return maxPendingPersists; | ||
} | ||
|
||
public boolean isForceGuaranteedRollup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
return partitionsSpec; | ||
} | ||
|
||
public PartitionsSpec getGivenOrDefaultPartitionsSpec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
@Deprecated | ||
public List<String> getPartitionDimensions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Why do you want to add a deprecated method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm after you get squared up with @jihoonson
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the cleanup. +1 after CI.
Fixes Issue #11231
Description
Refactor segment creation (and merge) phase of
Appenderator
for batch ingestion in such a way to decouple from streaming data structures and making memory growth bounded regardless of input file size. This PR are the changes toimplement the proposal Minimize memory utilization in Sinks/Hydrants for native batch ingestion.
The main changes were to separate
AppenderatorImpl
intoStreamAppenderator
(which isAppenderatorImpl
unchanged) andBatchAppenderator
. The latter is were the persistence ofSink
andFirehydrant
as described in the proposal is implemented. The rest of the changes were in associated interfaces and client code to mainly make the concurrency in the appenderator synchronous. The batch appenderator requires that push, persist, merge be synchronous in order to facilitate the handling ofSink
andFirehydrant
when they are persisted and restored. As the proposal indicates, future work is suggested to remove the concurrent APIs fromBatchAppenderator
(i.e any API that returns aListenableFuture
or aFuture
).One area that we did not explore was to look into the memory mapping behavior impact when maxColumnsToMerge is set. This is also for future work.
Feature flag
The middle manager (i.e. Peon) configuration flag
druid.indexer.task.useLegacyBatchProcessing
when set to true will default to the older, known to be working code path. This is to have a fall back in place just in case there is some issue with the code in the PR after it is merged. We will remove this flag in the future. The default for this flag,false
, is to use the new code (i.e. do not memory mapFirehydrant
indices as well as persist and clear from memory --but not from disk-- allSink
andFirehydrant
at each intermediate persist )Future work
Refactor
Appenderator
interfaceSee proposal for more about this
Study impact when
maxColumnsToMerge
is setThis setting also impacts the memory mapping of
Firehydrant
. Even though the references of the mappings are onlykept in local variables that can be garbage collected we should look into it at some point.
This PR has: