Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock that can occur while merging group by results #15420

Merged
merged 36 commits into from Apr 22, 2024

Conversation

LakshSingla
Copy link
Contributor

Description

When trying to merge the higher order group by results on broker (primarily), and perhaps on historicals (under special conditions when we push the subquery down to the historical), a deadlock can occur depending on the structure of the query.
The primary cause of this deadlock is that currently, we are trying to acquire the merge buffer resources at two places - GroupByQueryQueryToolchest (which is the place where we merge the higher-level results from the runners), and at GroupByMergingQueryRunnerV2 (to merge the results from the runners of the multiple segments). These two were incorrectly considered to be mutually exclusive till now:

  1. GroupByQueryQueryToolchest - Acquires the merge buffers on the brokers only, as the historicals get the query wit the subqueries and the grouping sets to null, therefore this doesn't acquire any merge buffers. However, this might not be true, because when we push down the subquery itself, the historicals will also acquire the merge buffers at this place.
  2. GroupByMergingQueryRunnerV2 - Acquires merge buffers on the historicals only, as it primarily merges the results from the query runners that run on the segments. This is also incorrect because the broker can act as a data server itself when the data source is an inline data source, and we attempt to run nested group bys on that.

Therefore, we have conditions when the GroupByQueryQueryToolchest is holding some merge buffers to merge the results of the returned runners, however, the runner itself is GroupByMergingQueryRunnerV2 (or a decorated runner on top of it), and it also requires merge buffers to merge the "segment-level" runners to provide to the mergeResults of the toolchest. As we donot acquire the resources in a single go, following situations happen:

Total merge buffers on the broker: 2
QueryA = QueryB = A query that needs 1 merge buffer to merge results (in toolchest), and 1 mergeBuffer for GroupByMergingQueryRunnerV2 on the broker
QueryA & QueryB are running simultaneously on the cluster

Time QueryA actions QueryB actions Buffers in system
0 2
1 toolchest acquires 1 buffer 1
2 toolchest acquires 1 buffer 0
3 mergingRunner acquires 1 buffer (blocked) 0
4 mergingRunner acquires 1 buffer (blocked) 0

The queries could have passed in isolation, however now they are waiting on the other to release the single merge buffer they hold to proceed.

This PR prevents such a deadlock from happening by acquiring the merge buffers in a single place and passing it down to the runner that might need it.

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Nov 27, 2023
Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try to sketch an alternate approach/idea I had while looking thru this PR:

what we have now:

  • pool of global resources
  • allocated 1-by-1

probable alternative:

  • ask for a sub-pool from the global pool
  • ask for buffers from that pool ( will return non-committed holders )
  • 1-time allocation:
    • have a method on the sub-pool to make all buffers live (like earlier)
      • can be called multiple times (at every place the stuff is used) but only the first allocates (and locks)
    • on the 1st call to a holder which is dormant all resources are allocated
  • either way: if the sub-pool have gone live it should not anymore return usable holders

something like:

  • the numMergeBuffers in GroupByMergingQueryRunnerV2 can be calculated earlier in run() before the return;
  • the mergebuffer holders can be requested there (no backing commitment)
  • before usage of the actual mergebuffers do a lock just before the usage of the stuff starts
  • GroupingEngine part could be similar...not sure

mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
}
return mergeBufferHolder;
GroupByQueryResources resource = (GroupByQueryResources) responseContext.get(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like responseContext is used as a backplane here...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LakshSingla explained that this is a role this class could fill in the future (thx :) )

Comment on lines +137 to +140
final int requiredMergeBufferNumForMergingQueryRunner =
usesGroupByMergingQueryRunner
? GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(groupByQueryConfig, query)
: 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a pretty odd contract that it gets counted here and used somewhere

I think decoupling things like this could increase complexity significantly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is pretty bad. What's also unsettling is that we now wanna make sure that GroupByQueryMergingRunnerV2 expects the merge buffer from someone above in the chain, which enforces a contract b/w two different query runners, with only a generic responseContext that holds them.

Merge buffers do need a redesign, and I hope the changes introduced in the patch become obsolete.

@kgyrtkirk
Copy link
Member

I've experimented a bit with the above stuff - and although it could work - its not that much different; it needed a different refactor to be done...

@gianm
Copy link
Contributor

gianm commented Jan 5, 2024

@LakshSingla TY for the patch! Would you mind leaving an explanation in a comment here about how the fix works? Like:

  • what code is responsible for figuring out how many merge buffers to allocate
  • what code is responsible for actually allocating them
  • where they get stored once allocated from the server-wide pool
  • how they get released back to the server-wide pool

It'll aid in reviewing, and aid anyone that comes back later to try to read about the logic.

@gianm
Copy link
Contributor

gianm commented Jan 5, 2024

One thing I'm wondering is what we need GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2 for. It shows up in places that should not need to be aware of the groupBy query, which isn't desirable.

@abhishekagarwal87
Copy link
Contributor

Curious when do we push down a subquery to historical? Where does that logic sit?

@gianm
Copy link
Contributor

gianm commented Jan 5, 2024

Curious when do we push down a subquery to historical? Where does that logic sit?

I think @LakshSingla is talking about forcePushDownNestedQuery, added in #5471, an undocumented feature.

@LakshSingla
Copy link
Contributor Author

Yes @abhishekagarwal87, if the flag is set, then the outer query along with the subquery is sent to the historicals. Else, only the subquery is sent to the historical and the broker does the processing on top of the subquery's results.

@gianm I'll try to add the comments in the code itself, making it more self-explanatory.

@LakshSingla
Copy link
Contributor Author

Adding a comment here as well to aid in the code walkthrough, and for anyone revisiting the PR:

The number of merge buffers required to execute a group by query is calculated based on the structure of the query. There are many levels of merging that can happen. Per my understanding, a raw segment gets aggregated after passing through the following structure:

  1. Each individual segment gets aggregated using the GroupByQueryEngineV2#process. This doesn't use the shared merge buffers. By definition, it is not nested, as it operates on the individual segments(In data server)
  2. The aggregated results from the multiple segments get partially merged, and combined into a single runner using the GroupByMergingQueryRunnerV2, which is sent to the broker. This utilises the shared merge buffers, and can be one or two, depending on the value of the config numParallelCombineThreads. Also, nested calls to this code doesn't use additional merge buffers, it goes through a much more expensive ChainedExecutionQueryRunner.
  3. The server calls additional QueryToolchest#mergeResults on the resulting runner to further aggregate the data. This doesn't use the merge buffers because the historical doesn't receive subqueries (see caveat below).
  4. The broker fetches the results from the multiple data servers, and "merges" them using the CachingClusteredClient.SpecificQueryRunnable#run. This "merge" doesn't aggregate the result objects, it orders the different sequence objects sequentially.
  5. The broker then calls the final QueryToolChest#mergeResults, which is then decorated upon.

Steps 1-3 happen on the data servers, while 4-5 happen on the brokers.
It is worth noting that GroupByQueryRunnerFactory#mergeRunners can take up 1-2 merging buffers depending on the value of the config numParallelCombineThreads and the GroupByQueryQueryRunnerToolchest can take up 0-3 merge buffers depending on the query structure (subqueries and the subtotals).

The above was an idealistic world, where there was no nested call between the mergeResults and the mergeRunners, therefore there was a single place where the merge buffer can be acquired. However, there are two esoteric cases when this would not be true:

  1. The historicals get subqueries - Historicals only get the innermost query, and the broker processes further results on the returned results of the innermost query. However, if the flag forcePushDownNestedQuery is set to true, then the historical can have nested query. The steps 2 & 3 in the flow chart above would both acquire merge buffers.
  2. The broker operates on inlined data source - The broker would then emulate a part of the historical's stack, and the broker would have a callstack like mergeResults(mergeRunners..) (See `LocalQuerySegmentWalker).

Therefore, in places where there's a nested call stack like mergeResults(.....mergeRunners(....)), the code acquires merge buffers in two places. This is true in:

  1. Data servers in all the cases (mergeResults can acquire 0 buffers, if the subquery & subtotals is null as is in most cases, however it can be non-null if the subquery was pushed down with the other query. Subtotals is always null)
  2. Broker, when the query is to be run on the historicals (again mergeResults can acquire exactly 0 buffers, depending on the structure, but the nested callstack is still there)

The only place where we don't have a nested call stack is when the Broker merges the results from the historicals, wherein the mergeRunners i called by the historicals, and the mergeResults is called on the "combined" version of the runner returned by the historicals.

@LakshSingla
Copy link
Contributor Author

To disambiguate between these cases, there's a flag called CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2 which determines if the query will use the mergeRunner inside the mergeResult callstack, which defaults to true, and is only false when the broker is merging the results from the data servers.

In the new code, the mergeResults will acquire all the merge buffers based on the following properties:

  1. Depending on the query structure, it will acquire the merge buffers (as was done in the original code)
  2. If the mergeRunners call is nested inside, it will acquire the merge buffers required by the GroupByQueryMergingRunnerV2 (1 or 2 depending on config value).

@gianm I can probably change the context parameter to be more generic and true for all the queries (CTX_KEY_USES_MERGE_RUNNERS) , however actionable only in the group by toolchest.

@LakshSingla
Copy link
Contributor Author

Regarding acquisition and freeing up the merge buffers, the GroupByResources instance will acquire the merge buffers when created with the appropriate number. It internally acquires all the merge buffers in a single take and separates them into two pools. The individual call sites can acquire and release the merge buffers from their respective pools when they require them. The GroupByResources instance when closed will release all the merge buffers to the Druid byte buffer pool.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still thinking on this PR, will have another pass soon

Comment on lines 34 to 39
public static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";

/**
*
*/
public static final String CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2 = "runnerMergesUsingGroupByMergingQueryRunnerV2";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a bunch of query context group by keys in GroupByQuery, these should probably live there too instead of adding a new class to hold them. Alternatively, I guess they could have just stayed in GroupByMergingQueryRunnerV2 since they seem kind of specific to its function.

Also please drop the "V2" from this flag name since there is only v2 now

/**
* Reason for using this is to ensure that we donot set the merge buffers multiple times on the same response context
*/
public static final ResponseContext.Key RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could just live in GroupingEngine or somewhere else if we move the query context parameters to GroupByQuery (or I suppose could also live with the query context parameters in GroupByQuery)

Comment on lines 205 to 210
QueryPlus<T> queryPlus1 = queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
);
final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus1, responseContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems off to apply it to all query types, when it only affects group by queries. Also, it doesn't really seem quite correct either, since if CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION is set, or if 'by segment' is set, mergeRunners does not take a merge buffer.

Comment on lines 242 to 244
Query<T> queryToRun = newQuery.withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false)
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this set to false here, and then inside of the cluster walker (which is CachingClusteredClient) it is immediately set to true?

Also, shouldn't there be something for the local walker? The local walker is the only thing on the broker that calls 'mergeRunners' to use the group by merging query runner... otherwise the broker does not use it for any other query shape afaict.

@gianm
Copy link
Contributor

gianm commented Jan 11, 2024

I'm hoping we can fix the bug without making the code too much more complex. There's some things that contribute to the complexity and I'm hoping we can do some simpler alternative:

  1. The new context keys add to complexity because there's no mechanism that ensures they are set at the appropriate places, so it's tough to understand how and when they're supposed to be set. They're kind of "magical".
  2. The fact that we are now cloning the response context, and are using these cloned response contexts to store lifecycled objects. It makes it harder to follow the lifecycle of the objects and ensure they are properly passed around and closed.

As an alternative to the context keys, maybe we can either add a parameter to mergeResults like willMergeRunners, or maybe we can add a new method on the toolchest entirely like acquireResources(boolean willMergeRunners). These are less "magical" than the context keys since Java itself will ensure that the parameters are passed in at the appropriate times. Makes it a lot easier to follow what's going on.

As to the response context, I don't really understand why we're cloning it. What's the reason for that? In any case, a possible alternative could be to put a unique key in the query context (just a string) and then have the various parts of the query use that key to get their resources from something that's injected (like the merge buffer pool is currently injected). Or, an alternative could be to eliminate the cloning and have a requirement that only one mergeRunners stack be running at once. I think that's in fact true…

Lastly, for testing, maybe I missed it but I didn't see a test case for the Broker-side scenario. The following query is the simplest one I could come up with that exhibits the problem. Perhaps ClientQuerySegmentWalkerTest would be a good place to put a test using a query like this.

{
  "queryType": "groupBy",
  "dataSource": {
    "type": "query",
    "query": {
      "queryType": "groupBy",
      "dataSource": {
        "type": "inline",
        "columnNames": []
        "rows": []
      },
      "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
      "granularity": "all"
    }
  },
  "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
  "granularity": "all"
}

@cryptoe cryptoe added this to the Druid 29.0.0 milestone Jan 17, 2024
@LakshSingla
Copy link
Contributor Author

LakshSingla commented Jan 18, 2024

@gianm

The new context keys add to complexity because there's no mechanism that ensures they are set at the appropriate places, so it's tough to understand how and when they're supposed to be set. They're kind of "magical".

I think the use of keys has been exaggerated in my code, and can probably be minimized if we have a unique key set on the context per query, which can be used to reference the already reserved resources from a globally available pool (with some reservation built-in.

With regards to cloning, I don't think there's a real need for it, apart from ensuring that the callers up the stack of query runners don't get a contaminated pool, which contains keys set by the children. I was envisioning this would be useful in case when there are multiple to the mergeResults->mergeRunners called by a parent runner, being called in parallel. The child runner won't have a chance to clean up the response context object, while it gets passed to the other runner.

Regarding the above comments, I have come up with the following alternative:

  1. The mergeResults will have willMergeRunners method, which will get populated by the callers, and in the production code will only be set to false by the CachingClusteredClient. Note: Can this be done by the caching clustered client as well, in case the segment caching is enabled? (Probably so, but GroupByQuery ignores segment caching so, the semantics of the flag will be a bit different than actually demonstrating - "will it merge runners")
  2. Having a reservation pool like
GroupByResourcesReservationPool(){
    public void reserve(String uniqueId, int numMergeBuffers)
    public void reserve(String uniqueId, int numMergeBuffers, long timeoutMs)
    public GroupByQueryResources take(String uniqueId)
    public GroupByQueryResources cleanup(String uniqueId)
}

@gianm
Copy link
Contributor

gianm commented Feb 24, 2024

@LakshSingla is this ready for another review or are some other changes pending?

@LakshSingla
Copy link
Contributor Author

@gianm I am working on fixing up the ITs and the UTs due to the changed semantics of resource passing.
A few issues I ran into were sharing of the same resource ID on the historicals for union queries, query runners not taking lightly to modified queries and UTs not populating the query ID due to extensive mocking of the mergeResults + mergeRunenrs stack. I should be able to correct the ITs shortly.

@LakshSingla
Copy link
Contributor Author

@gianm With the recent changes made to the PR, I wanted to revisit the questions posed here, and more, that will help in reviewing.

Assumptions
There's an attempt to link various places where the merge buffers are acquired (mergeResults) and merge buffers are utilized (mergeResults and mergeRunners). However, Druid's code doesn't provide any explicit contract between the arguments of these methods, and input to mergeResults can be any runner, and it should function the same. While this provides flexibility and reusability to the methods, this also necessitates that there are some assumptions that the code makes implicitly, to know what type of runner is passed to mergeResults:

  1. For a given query, and a given server, only a single top-level mergeResults call will be made, that will collect the results from the various runners. The code will break down if there are multiple, nested mergeResults calls made (unnested calls are fine, though they don't happen)
  2. There can be multiple mergeRunners, because GroupByMergingQueryRunner only needs the merge buffers for the top-level query runner, nested ones execute via an unoptimized way.
  3. There's some knowledge to the mergeResults that the query runner passed to it is the one created by the corresponding toolchest's mergeRunners (which is the typical use case). This is encoded in the argument willMergeRunner, and is to be set by the callers. The only production use case where this isn't true is when the broker is merging the results gathered from the historical)

These are true to my knowledge at the time of the PR (and they should remain true unless there's some whacky change in the query stack). Also, these assumptions need to be more valid for group-by queries, because only they require shared resources. Rest all queries don't, and they don't rely on any of these assumptions being correct.

Resource ID
Each query has a unique resource id, that is assigned to it when it enters the queryable server:

  • For brokers: It's the ClientQuerySegmentWalker
  • For historical: It's the ServerManager
  • For peons: It's the SinkQuerySegmentWalker
    These three classes are one of the first places the query reaches when it begins processing, therefore it is guaranteed that if the resource id is allotted at only these places, no one will overwrite the resource id during the execution.
    Note: Historicals and Peons could have used the same query id allotted by the brokers, however they assign their own because:
    a) The user can directly choose to query the data server (while debugging etc)
    b) UNIONs are treated as multiple separate queries when the broker sends them to the historicals. Therefore we require a unique id for each part of the union, and hence we need to reassign the resource id to the query's part, or else they'll end up sharing the same resource ID.

Tests modifications
This section lays out the modifications made to the test cases
With the assumptions laid out, we need to modify the tests, because they try to mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold. For example, in many test cases, there are two nested mergeResults calls, the outer call mimics what the broker does, while the inner one mimics what the historical does.
Therefore, we need to assign a unique resource id b/w each mergeResults call, and also make sure that that the top level mergeResults would have willMergeRunner = false, since it's being called on top of a mergeResults's runner, while the inner one would have willMergeRunner = true because its being called on actual runners.

Merge buffer allocation
The merge buffers are allocated and associated with a given resource id in the global pool GroupByResourcesReservationPool. Multiple attempts to insert the same resource id will fail, therefore we know that there will only be resources allocated only once, as long as the query id doesn't change during the execution of the query. The pool is cleaned once close() is called on the reserved resources, and the mapping is removed, thus ensuring that the mapping doesn't keep growing during the execution of the queries.

The call to allocate the merge buffers in the pool is done by mergeResults, and it allocates the resources required for it's execution as well as the execution of the GroupByMergingQueryRunner if willMergeRunners=true. The GroupByMergingQueryRunner doesn't allocate any resources, it assumes that the resources have been preallocated, and just takes them from the pool.

Once the required merge buffers are allocated from the pool, they cannot be used by the other queries till the close() method is called on the GroupByQueryResource. This is usually done with a call to the GroupByResourcesReservationPool#clean() which does this and also cleans up the mapping.
While the GroupByQueryResource is unclosed, the merge buffers can be taken and given back to it as needed during the execution of the query. As such, the resources are not released back to the global pool, and only given back to signify that the work of that execution unit is complete and it can be reused (or closed safely). Closing the GroupByQueryResources when all the merge buffers are not acquired back from the individual execution units log a warning, but doesn't throw. The resources get freed up, and if the execution unit was actually using the resources for something, it can error out.

@LakshSingla
Copy link
Contributor Author

@gianm The PR is ready for review now. There are a few code coverage failures, but I think they are due to making changes in the unrelated classes using the GroupByQueryQueryToolchest.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely cleaner than the prior approach. In the review comments I had some suggestions about making things clearer.

@@ -90,9 +90,9 @@ public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
}

public FluentQueryRunner<T> mergeResults()
public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc for what willMergeRunner means. I recognize most other stuff in here doesn't have javadocs, but, still.

@@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation()
);
}

public String getQueryResourceId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it'd be good to make this a wrapper around String like ResourceId. That provides us a central place to put some javadocs that explain how resources work, and link with @link and @see to other relevant files. It also makes it easier to find usages in an IDE.

* determine if the mergeResults should acquire those resources for the merging runners, before beginning execution.
* If not overridden, this method will ignore the {@code willMergeRunner} parameter.
*
* Ideally {@link #mergeResults(QueryRunner)} should have delegated to this method after setting the default value of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple questions about this:

  • what should the "default value of willMergeRunner" be? It seems like a bunch of places use true, but why is that?
  • what should new toolchests do? Is it ok to override just the new 2-arg mergeResults call, or do both need to be overridden?

I'm hoping we can make this willMergeRunner piece more clear, since IMO it's the main unclear thing left in the patch after the last round of changes.

}

@Inject
public GroupByQueryQueryToolChest(
GroupingEngine groupingEngine,
Supplier<GroupByQueryConfig> queryConfigSupplier,
GroupByQueryMetricsFactory queryMetricsFactory
GroupByQueryMetricsFactory queryMetricsFactory,
@Merging GroupByResourcesReservationPool groupByResourcesReservationPool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it— there is no reason to use @Merging here, since there's only one kind of GroupByResourcesReservationPool. (The annotations are used to disambiguate when there's multiple kinds of some injectable key.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I reasoned that @Merging was used to annotate the global resources associated with the group by merging therefore I annotated the resource pool with that, and it doesn't have any functional relevance. I am fine with removing it as well.

/**
* Map of query's resource id -> group by resources reserved for the query to execute
*/
final ConcurrentHashMap<String, GroupByQueryResources> pool = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ResourceId as key once it exists

@LakshSingla
Copy link
Contributor Author

@gianm
Thanks for taking another look. I have addressed the review comments. Regarding the relevance and the use case of the willMergeRunner, I have reworded the docs. LMK in case they are still cryptic. The idea is to signify the mergeResults that the runner being called with it will be the GroupByMergingQueryRunner so that it can allocate the resources for that (hence the name "willMergeRunners", since it is not limited to the groupBy tool chest).

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a minor note but otherwise LGTM. Feel free to merge when tests are passing.


@Override
public String toString()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler, and will make interpolations nicer, to just do return queryResourceId; here.

@LakshSingla
Copy link
Contributor Author

The coverage checks are failing, but due to the refactoring done on unrelated classes - DumpSegment, MaterializedVeiwQueryToolchest. There's also one Jacoco failure, which I think is due to the code coverage check itself.
Merging the PR, and ensuring that nothing breaks on the master.

@LakshSingla LakshSingla merged commit b9bbde5 into apache:master Apr 22, 2024
80 of 85 checks passed
@LakshSingla LakshSingla deleted the merge-buffer-deadlock branch April 22, 2024 08:40
@LakshSingla
Copy link
Contributor Author

Thanks for the reviews @gianm @kgyrtkirk @clintropolis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants