Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock that can occur while merging group by results #15420

Merged
merged 36 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
de216d5
init commit
LakshSingla Nov 23, 2023
35bf551
test cases fix
LakshSingla Nov 24, 2023
09689e9
test cases fix 2
LakshSingla Nov 27, 2023
e85f4b0
test cases fix 3
LakshSingla Nov 28, 2023
ce89fdf
test cases fix 4
LakshSingla Nov 28, 2023
a34bcde
caching clustered client removes the flag before sending the requests
LakshSingla Nov 28, 2023
a4f476d
fix test
LakshSingla Nov 29, 2023
fe32796
add comments
LakshSingla Nov 29, 2023
6ecd555
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Jan 17, 2024
b168ce9
init 2
LakshSingla Jan 19, 2024
6cd8aa0
some tests fix
LakshSingla Jan 29, 2024
ac44edb
merge and fix build
LakshSingla Feb 20, 2024
d2886bc
tests
LakshSingla Feb 22, 2024
ceef4f1
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Feb 22, 2024
75aee0a
compilation
LakshSingla Feb 22, 2024
951dce2
compilation benchmarks
LakshSingla Feb 22, 2024
3c1b124
compilation benchmarks, responseContext changes revert
LakshSingla Feb 22, 2024
8c72a92
DumpSegment test fix
LakshSingla Feb 22, 2024
bcfabf8
compilation and tests
LakshSingla Feb 22, 2024
3c7fd81
codeql and test
LakshSingla Feb 23, 2024
e110662
histogram test
LakshSingla Feb 23, 2024
14f5bab
distinctcount test
LakshSingla Feb 23, 2024
d956e73
groupByTimeseriesRunnerTest
LakshSingla Feb 23, 2024
bf57bcc
fix issue with unions
LakshSingla Feb 24, 2024
2c3e675
unit test fix
LakshSingla Feb 27, 2024
dc2aa53
test fix
LakshSingla Feb 27, 2024
430f2e5
fix it util to properly display error message
LakshSingla Feb 27, 2024
76bc024
new runner
LakshSingla Feb 28, 2024
349e8d9
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Feb 28, 2024
d26f7fc
comments, test cases
LakshSingla Feb 29, 2024
b3850df
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Apr 9, 2024
8f255d7
checkstyle
LakshSingla Apr 9, 2024
6980d77
query resource id
LakshSingla Apr 9, 2024
e2da72a
review comments
LakshSingla Apr 10, 2024
9271043
comments, tests
LakshSingla Apr 12, 2024
b8eada8
review final
LakshSingla Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,14 @@ public String getFormatString()
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, mergePool)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,11 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, mergeBufferPool);
return new GroupByQueryRunnerFactory(groupingEngine, toolChest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,15 +491,14 @@ public String getFormatString()
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, mergePool)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testDecorateObjectMapper() throws IOException
QueryToolChest queryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down Expand Up @@ -186,7 +186,7 @@ public void testDecorateObjectMapperMaterializedViewQuery() throws IOException
QueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down Expand Up @@ -245,7 +245,7 @@ public void testGetRealQuery()
MaterializedViewQueryQueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,14 @@ public int getNumThreads()
},
GroupByQueryConfig::new,
new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1))
);

runner = QueryRunnerTestHelper.makeQueryRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public <RowType, QueryType> Pair<DataServerQueryStatus, Yielder<RowType>> fetchR
final int numRetriesOnMissingSegments = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);

log.debug("Querying severs[%s] for segment[%s], retries:[%d]", servers, segmentDescriptor, numRetriesOnMissingSegments);
final ResponseContext responseContext = new DefaultResponseContext();
final ResponseContext responseContext = DefaultResponseContext.createEmpty();

Pair<DataServerQueryStatus, Yielder<RowType>> statusSequencePair;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.guice.annotations.PublicApi;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -30,16 +31,34 @@
@PublicApi
public class ConcurrentResponseContext extends ResponseContext
{

private final ConcurrentHashMap<Key, Object> delegate;

private ConcurrentResponseContext()
{
this(Collections.emptyMap());
}

private ConcurrentResponseContext(final Map<Key, Object> delegate)
{
this.delegate = new ConcurrentHashMap<>(delegate);
}

public static ConcurrentResponseContext createEmpty()
{
return new ConcurrentResponseContext();
}

private final ConcurrentHashMap<Key, Object> delegate = new ConcurrentHashMap<>();

@Override
protected Map<Key, Object> getDelegate()
{
return delegate;
}

@Override
public ResponseContext clone()
{
return new ConcurrentResponseContext(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.guice.annotations.PublicApi;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -30,16 +31,33 @@
@PublicApi
public class DefaultResponseContext extends ResponseContext
{

private final HashMap<Key, Object> delegate;

private DefaultResponseContext()
{
this(Collections.emptyMap());
}

private DefaultResponseContext(final Map<Key, Object> delegate)
{
this.delegate = new HashMap<>(delegate);
}

public static DefaultResponseContext createEmpty()
{
return new DefaultResponseContext();
}

private final HashMap<Key, Object> delegate = new HashMap<>();

@Override
protected Map<Key, Object> getDelegate()
{
return delegate;
}

@Override
public ResponseContext clone()
{
return new DefaultResponseContext(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public abstract static class AbstractKey implements Key
private final boolean canDrop;
private final Function<JsonParser, Object> parseFunction;

AbstractKey(String name, boolean inHeader, boolean canDrop, Class<?> serializedClass)
public AbstractKey(String name, boolean inHeader, boolean canDrop, Class<?> serializedClass)
{
this.name = name;
this.inHeader = inHeader;
Expand All @@ -158,7 +158,7 @@ public abstract static class AbstractKey implements Key
};
}

AbstractKey(String name, boolean inHeader, boolean canDrop, TypeReference<?> serializedTypeReference)
public AbstractKey(String name, boolean inHeader, boolean canDrop, TypeReference<?> serializedTypeReference)
{
this.name = name;
this.inHeader = inHeader;
Expand Down Expand Up @@ -557,6 +557,9 @@ public Key find(String name)

protected abstract Map<Key, Object> getDelegate();

@Override
public abstract ResponseContext clone();

public Map<String, Object> toMap()
{
return CollectionUtils.mapKeys(getDelegate(), k -> k.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.data.input.Row;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
Expand All @@ -42,12 +43,14 @@
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.MappedSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DataSource;
Expand Down Expand Up @@ -75,6 +78,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
Expand Down Expand Up @@ -102,23 +106,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
private final GroupingEngine groupingEngine;
private final GroupByQueryConfig queryConfig;
private final GroupByQueryMetricsFactory queryMetricsFactory;
private final BlockingPool<ByteBuffer> mergeBufferPool;

@VisibleForTesting
public GroupByQueryQueryToolChest(GroupingEngine groupingEngine)
public GroupByQueryQueryToolChest(GroupingEngine groupingEngine, BlockingPool<ByteBuffer> mergeBufferPool)
{
this(groupingEngine, GroupByQueryConfig::new, DefaultGroupByQueryMetricsFactory.instance());
this(groupingEngine, GroupByQueryConfig::new, DefaultGroupByQueryMetricsFactory.instance(), mergeBufferPool);
}

@Inject
public GroupByQueryQueryToolChest(
GroupingEngine groupingEngine,
Supplier<GroupByQueryConfig> queryConfigSupplier,
GroupByQueryMetricsFactory queryMetricsFactory
GroupByQueryMetricsFactory queryMetricsFactory,
@Merging BlockingPool<ByteBuffer> mergeBufferPool
)
{
this.groupingEngine = groupingEngine;
this.queryConfig = queryConfigSupplier.get();
this.queryMetricsFactory = queryMetricsFactory;
this.mergeBufferPool = mergeBufferPool;
}

@Override
Expand Down Expand Up @@ -152,16 +159,34 @@ private Sequence<ResultRow> initAndMergeGroupByResults(
ResponseContext context
)
{
final GroupByQueryResources resource = groupingEngine.prepareResource(query);
// .. 1. Historicals, Broker -> Which is using localWalker
// MerginV2 ->
ResponseContext clonedContext = context.clone();
Boolean usesGroupByMergingQueryRunner = (Boolean) query
.getContext()
.getOrDefault(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true);
final GroupByQueryResources resource = GroupingEngine.prepareResource(
query,
mergeBufferPool,
usesGroupByMergingQueryRunner,
queryConfig
);
if (usesGroupByMergingQueryRunner) {
clonedContext.add(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS, resource);
}
try {
final Sequence<ResultRow> mergedSequence = mergeGroupByResults(
query,
resource,
runner,
context
clonedContext
);

return Sequences.withBaggage(mergedSequence, resource);
Closer closer = Closer.create();
closer.register(resource);
closer.register(() ->
clonedContext.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS)
);
return Sequences.withBaggage(mergedSequence, closer);
}
catch (Exception e) {
// Error creating the Sequence; release resources.
Expand Down