Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock that can occur while merging group by results #15420

Merged
merged 36 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
de216d5
init commit
LakshSingla Nov 23, 2023
35bf551
test cases fix
LakshSingla Nov 24, 2023
09689e9
test cases fix 2
LakshSingla Nov 27, 2023
e85f4b0
test cases fix 3
LakshSingla Nov 28, 2023
ce89fdf
test cases fix 4
LakshSingla Nov 28, 2023
a34bcde
caching clustered client removes the flag before sending the requests
LakshSingla Nov 28, 2023
a4f476d
fix test
LakshSingla Nov 29, 2023
fe32796
add comments
LakshSingla Nov 29, 2023
6ecd555
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Jan 17, 2024
b168ce9
init 2
LakshSingla Jan 19, 2024
6cd8aa0
some tests fix
LakshSingla Jan 29, 2024
ac44edb
merge and fix build
LakshSingla Feb 20, 2024
d2886bc
tests
LakshSingla Feb 22, 2024
ceef4f1
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Feb 22, 2024
75aee0a
compilation
LakshSingla Feb 22, 2024
951dce2
compilation benchmarks
LakshSingla Feb 22, 2024
3c1b124
compilation benchmarks, responseContext changes revert
LakshSingla Feb 22, 2024
8c72a92
DumpSegment test fix
LakshSingla Feb 22, 2024
bcfabf8
compilation and tests
LakshSingla Feb 22, 2024
3c7fd81
codeql and test
LakshSingla Feb 23, 2024
e110662
histogram test
LakshSingla Feb 23, 2024
14f5bab
distinctcount test
LakshSingla Feb 23, 2024
d956e73
groupByTimeseriesRunnerTest
LakshSingla Feb 23, 2024
bf57bcc
fix issue with unions
LakshSingla Feb 24, 2024
2c3e675
unit test fix
LakshSingla Feb 27, 2024
dc2aa53
test fix
LakshSingla Feb 27, 2024
430f2e5
fix it util to properly display error message
LakshSingla Feb 27, 2024
76bc024
new runner
LakshSingla Feb 28, 2024
349e8d9
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Feb 28, 2024
d26f7fc
comments, test cases
LakshSingla Feb 29, 2024
b3850df
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Apr 9, 2024
8f255d7
checkstyle
LakshSingla Apr 9, 2024
6980d77
query resource id
LakshSingla Apr 9, 2024
e2da72a
review comments
LakshSingla Apr 10, 2024
9271043
comments, tests
LakshSingla Apr 12, 2024
b8eada8
review final
LakshSingla Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
}

/**
* Delegates to {@link QueryToolChest#mergeResults(QueryRunner, boolean)}.
*
* @see QueryToolChest#mergeResults(QueryRunner, boolean)
*/
public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc for what willMergeRunner means. I recognize most other stuff in here doesn't have javadocs, but, still.

{
return from(toolChest.mergeResults(baseRunner, willMergeRunner));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,9 @@ public boolean isWindowingStrictValidation()
);
}

public String getQueryResourceId()
public QueryResourceId getQueryResourceId()
{
return getString(QueryContexts.QUERY_RESOURCE_ID);
return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID));
}

public String getBrokerServiceName()
Expand Down
121 changes: 121 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryResourceId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.google.common.base.Preconditions;

import java.util.Objects;

/**
* Wrapper class on the queryResourceId string. The object must be addressable on an associative map, therefore it must implement
* equals and hashCode.
* <p>
* Query's resource id is used to allocate the resources, and identify the resources allocated to a query in a global pool.
* Queries USUALLY do not share any resources - each query is assigned its own thread, and buffer pool. However, some resources
* are shared globally - the GroupBy query's merge buffers being a prime example of those (and the primary utiliser of the
* query's resource id). Such resources MUST be allocated once to prevent deadlocks, and can be used throughout the query stack, till
* the query holds those resources, or till its completion. A query holding a global resources must not request for more of the same
* resource, or else it becomes a candidate for deadlocks.
* <p>
* Each query has a unique resource id, that is assigned to it when it enters the queryable server. This is distinct from
* the existing queryId, subqueryId and sqlQueryId in the following ways:
* 1. It is not assigned by the user, it is assigned internally for usage by the Druid server
* 2. The query's resource id will be unique to the query in the system. The queryId can be non-unique amongst the queries
* that are running in the system. Druid must ensure that the queryResourceId isn't unique. If the user (somehow)
* assigns the queryResourceId to the query, it must be overwritten internally.
* 3. During the query server <-> data server communication, the queryResourceId assigned to a particular query can (and will)
* differ in the query servers and the data servers. This is particularly helpful in case of union queries, where a
* single query in the broker can be treated as two separate queries and executed simultaneously in the historicals.
* <p>
* The queryId is assigned to the query, and populated in the query context at the time it hits the queryable server. In Druid,
* there are three queryable servers (classes are not linkable from this method):
* 1. {@link org.apache.druid.server.ClientQuerySegmentWalker} - For brokers
* 2. {@link org.apache.druid.server.coordination.ServerManager} - For historicals
* 3. {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} - For peons & indexer's tasks
* <p>
* These three classes are one of the first places the query reaches when it begins processing, therefore it is
* guaranteed that if the resource id is allotted at only these places, no one will overwrite the resource id
* during the execution.
* <p>
* Note: Historicals and Peons could have used the same query id allotted by the brokers, however they assign their own because:
* 1. The user can directly choose to query the data server (while debugging etc.)
* 2. UNIONs are treated as multiple separate queries when the broker sends them to the historicals. Therefore, we
* require a unique id for each part of the union, and hence we need to reassign the resource id to the query's part,
* or else they'll end up sharing the same resource id, as mentioned before
* <p>
* Notable places where QueryResourceId is used:
* <p>
* 1. {@link org.apache.druid.query.groupby.GroupByResourcesReservationPool} Primary user of the query resource id.
* <p>
* 2. {@link org.apache.druid.server.ClientQuerySegmentWalker} Allocates the query resource id on the brokers
* <p>
* 3. {@link org.apache.druid.server.coordination.ServerManager} Allocates the query resource id on the historicals
* <p>
* 4. {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} Allocates the query resource id on the peons
* (MMs) and indexers
* <p>
* 5. {@link org.apache.druid.server.ResourceIdPopulatingQueryRunner} Populates the query resource id. ({@link org.apache.druid.server.ClientQuerySegmentWalker}
* allocates the query resource id directly, since it also does a bunch of transforms to the query)
* <p>
* 6. {@link org.apache.druid.query.groupby.GroupByQueryQueryToolChest} Allocates, and associates one of the global resources,
* merge buffers, with the query's resource id. It also cleans it up, once the query is completed. Apart from that,
* it is also a consumer of the merge buffers it allocates.
* <p>
* 7. {@link org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunner} One of the consumer of the merge buffers,
* allocated at the beginning of the query
*
* @see org.apache.druid.query.groupby.GroupByResourcesReservationPool
*/
public class QueryResourceId
{
private final String queryResourceId;

public QueryResourceId(String queryResourceId)
{
this.queryResourceId = Preconditions.checkNotNull(queryResourceId, "queryResourceId must be present");
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
QueryResourceId that = (QueryResourceId) o;
return Objects.equals(queryResourceId, that.queryResourceId);
}

@Override
public int hashCode()
{
return Objects.hash(queryResourceId);
}

@Override
public String toString()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler, and will make interpolations nicer, to just do return queryResourceId; here.

return "QueryResourceId{" +
"queryResourceId='" + queryResourceId + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,23 @@ public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner)
}

/**
* Like {@link #mergeResults(QueryRunner)}, but additional context parameter to determine whether the input runner
* to the method would be the result from the corresponding {@link QueryRunnerFactory#mergeRunners}. Merging can
* require additional resources, like merge buffers for group-by queries, therefore the flag, can help
* determine if the mergeResults should acquire those resources for the merging runners, before beginning execution.
* If not overridden, this method will ignore the {@code willMergeRunner} parameter.
* Like {@link #mergeResults(QueryRunner)}, but with an additional flag that indicates the type of runner that is passeed to the call.
*
* Ideally {@link #mergeResults(QueryRunner)} should have delegated to this method after setting the default value of
* the `willMergeRunner` however this was added later, therefore the existing toolchests (in extensions) would
* not have implemented this.
* willMergeRunner specifies that the input runner to the mergeResults would be the one created by the corresponding
* {@link QueryRunnerFactory#mergeRunners}.
* While it depends on the input runner, it is usually true since most of the time the same server is generating a runner
* that it wants to merge. The notable deviation from this norm is when the broker is accumulating the results from the
* data servers and needs to merge them together. In this case willMergeRunner is false.
*
* Currently, the sole consumer of this parameter is {@link org.apache.druid.query.groupby.GroupByQueryQueryToolChest}, where
* it is used to determine if the mergeResults is called with {@link org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunner}
* to estimate the number of merge buffers required for the query to succeed. It is set false on the brokers, because they
* (mostly) fetch the results from the historicals, while the data servers set it to false (because they call this method
* with {@link QueryRunnerFactory#mergeRunners}.
*
* By default, the willMergeRunners is ignored, and the {@link #mergeResults(QueryRunner)} is called. For the toolchests
* that override this method must ensure that {@link #mergeResults(QueryRunner)} delegates to it (else it will use the
* default implementation for {@link #mergeResults(QueryRunner)}) which would be undesirable.
*/
public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner, boolean willMergeRunner)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryResourceId;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SubqueryQueryRunner;
Expand Down Expand Up @@ -174,7 +175,7 @@ private Sequence<ResultRow> initAndMergeGroupByResults(
)
{
// Reserve the group by resources (merge buffers) required for executing the query
final String queryResourceId = query.context().getQueryResourceId();
final QueryResourceId queryResourceId = query.context().getQueryResourceId();
groupByResourcesReservationPool.reserve(queryResourceId, query, willMergeRunner);

final GroupByQueryResources resource = groupByResourcesReservationPool.fetch(queryResourceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.query.QueryResourceId;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand All @@ -30,13 +31,57 @@

/**
* Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID.
* The merge buffers can be required for the group by query in a couple of places:
* 1. {@link GroupByQueryQueryToolChest}
* 2. {@link GroupByResourcesReservationPool}
* However, acquiring them separately can lead to deadlocks when multiple queries are fired. Therefore, instead of
* acquiring them separately, we acquire them once during the query execution, in {@link GroupByQueryQueryToolChest} and
* use those resources till the query is active.
* <p>
* ALLOCATION
* The merge buffers are allocated and associated with a given resource id in this pool. Multiple attempts to insert the same resource id will fail,
* therefore we know that there will only be resources allocated only once, as long as the query id doesn't change during the execution of the query.
* The pool is cleaned once close() is called on the reserved resources, and the mapping is removed, thus ensuring that the mapping doesn't keep growing
* during the execution of the queries.
* The call to allocate the merge buffers in the pool is done by mergeResults, and it allocates the resources required for its execution as well as the
* execution of the GroupByMergingQueryRunner if willMergeRunners=true. The GroupByMergingQueryRunner doesn't allocate any resources, it assumes that the resources
* have been preallocated, and just takes them from the pool.
* Once the required merge buffers are allocated from the pool, they cannot be used by the other queries till the close() method is called on the GroupByQueryResource.
* This is usually done with a call to the GroupByResourcesReservationPool#clean() which does this and also cleans up the mapping.
* While the GroupByQueryResource is unclosed, the merge buffers can be taken and given back to it as needed during the execution of the query. As such, the resources are not
* released back to the global pool, and only given back to signify that the work of that execution unit is complete, and it can be reused (or closed safely). Closing the GroupByQueryResources
* when all the merge buffers are not acquired back from the individual execution units log a warning, but doesn't throw.
* The resources get freed up, and if the execution unit was actually using the resources for something, it can error out.
* <p>
* ASSUMPTIONS
* There's an attempt to link various places where the merge buffers are acquired ({@link org.apache.druid.query.QueryToolChest#mergeResults})
* and merge buffers are utilized ({@link org.apache.druid.query.QueryToolChest#mergeResults} and {@link GroupByQueryRunnerFactory.GroupByQueryRunner#mergeRunners}).
* However, Druid's code doesn't provide any explicit contract between the arguments of these methods, and input to {@code mergeResults} can be any runner,
* and it should function the same. While this provides flexibility and reusability to the methods, this also necessitates that there are some assumptions
* that the code makes implicitly, to know what type of runner is passed to mergeResults - so that the mergeResults can allocate
* the merge buffers required for the runner appropriately.
* <p>
* 1. For a given query, and a given server, only a single top-level mergeResults call will be made, that will collect the results from the various runners.
* The code will break down if there are multiple, nested mergeResults calls made (unnested calls are fine, though they don't happen)
* 2. There can be multiple mergeRunners, because GroupByMergingQueryRunner only needs the merge buffers for the top-level query runner,
* nested ones execute via an unoptimized way.
* 3. There's some knowledge to the mergeResults that the query runner passed to it is the one created by the corresponding toolchest's
* mergeRunners (which is the typical use case). This is encoded in the argument {@code willMergeRunner}, and is to be set by the callers.
* The only production use case where this isn't true is when the broker is merging the results gathered from the historical)
* <p>
* TESTING
* Unit tests mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold.
* In many test cases, there are two nested mergeResults calls, the outer call mimics what the broker does, while the inner one mimics what the historical does,
* and the assumption (1) fails. Therefore, the testing code should assign a unique resource id b/w each mergeResults call, and also make sure that the top level mergeResults
* would have willMergeRunner = false, since it's being called on top of a mergeResults's runner, while the inner one would have willMergeRunner = true because its being
* called on actual runners (as it happens in the brokers, and the historicals)
*/
public class GroupByResourcesReservationPool
{
/**
* Map of query's resource id -> group by resources reserved for the query to execute
*/
final ConcurrentHashMap<String, GroupByQueryResources> pool = new ConcurrentHashMap<>();
final ConcurrentHashMap<QueryResourceId, GroupByQueryResources> pool = new ConcurrentHashMap<>();

/**
* Buffer pool from where the merge buffers are picked and reserved
Expand All @@ -59,9 +104,9 @@ public GroupByResourcesReservationPool(
}

/**
* Reserves appropariate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map
* Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map
*/
public void reserve(String queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
{
if (queryResourceId == null) {
throw DruidException.defensive("Query resource id must be populated");
Expand All @@ -78,15 +123,15 @@ public void reserve(String queryResourceId, GroupByQuery groupByQuery, boolean w
* Fetches resources corresponding to the given resource id
*/
@Nullable
public GroupByQueryResources fetch(String queryResourceId)
public GroupByQueryResources fetch(QueryResourceId queryResourceId)
{
return pool.get(queryResourceId);
}

/**
* Removes the entry corresponding to the unique id from the map, and cleans up the resources.
*/
public void clean(String queryResourceId)
public void clean(QueryResourceId queryResourceId)
{
GroupByQueryResources resources = pool.remove(queryResourceId);
if (resources != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryResourceId;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
Expand Down Expand Up @@ -313,7 +314,7 @@ public void cleanup(CloseableGrouperIterator<RowBasedKey, ResultRow> iterFromMak

private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(GroupByQuery query, int numBuffers)
{
String queryResourceId = query.context().getQueryResourceId();
QueryResourceId queryResourceId = query.context().getQueryResourceId();
GroupByQueryResources resource = groupByResourcesReservationPool.fetch(queryResourceId);
if (resource == null) {
throw DruidException.defensive(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class QueryResourceIdTest
{

@Test
public void testConstructorWithNullString()
{
Assert.assertThrows(NullPointerException.class, () -> new QueryResourceId(null));
}

@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(QueryResourceId.class)
.withNonnullFields("queryResourceId")
.usingGetClass()
.verify();
}

@Test
public void testAddressableOnAssociativeMap()
{
Map<QueryResourceId, Integer> map = new HashMap<>();
map.put(new QueryResourceId("abc"), 1);
Assert.assertEquals(1, (int) map.get(new QueryResourceId("abc")));

}
}