Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock that can occur while merging group by results #15420

Merged
merged 36 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
de216d5
init commit
LakshSingla Nov 23, 2023
35bf551
test cases fix
LakshSingla Nov 24, 2023
09689e9
test cases fix 2
LakshSingla Nov 27, 2023
e85f4b0
test cases fix 3
LakshSingla Nov 28, 2023
ce89fdf
test cases fix 4
LakshSingla Nov 28, 2023
a34bcde
caching clustered client removes the flag before sending the requests
LakshSingla Nov 28, 2023
a4f476d
fix test
LakshSingla Nov 29, 2023
fe32796
add comments
LakshSingla Nov 29, 2023
6ecd555
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Jan 17, 2024
b168ce9
init 2
LakshSingla Jan 19, 2024
6cd8aa0
some tests fix
LakshSingla Jan 29, 2024
ac44edb
merge and fix build
LakshSingla Feb 20, 2024
d2886bc
tests
LakshSingla Feb 22, 2024
ceef4f1
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Feb 22, 2024
75aee0a
compilation
LakshSingla Feb 22, 2024
951dce2
compilation benchmarks
LakshSingla Feb 22, 2024
3c1b124
compilation benchmarks, responseContext changes revert
LakshSingla Feb 22, 2024
8c72a92
DumpSegment test fix
LakshSingla Feb 22, 2024
bcfabf8
compilation and tests
LakshSingla Feb 22, 2024
3c7fd81
codeql and test
LakshSingla Feb 23, 2024
e110662
histogram test
LakshSingla Feb 23, 2024
14f5bab
distinctcount test
LakshSingla Feb 23, 2024
d956e73
groupByTimeseriesRunnerTest
LakshSingla Feb 23, 2024
bf57bcc
fix issue with unions
LakshSingla Feb 24, 2024
2c3e675
unit test fix
LakshSingla Feb 27, 2024
dc2aa53
test fix
LakshSingla Feb 27, 2024
430f2e5
fix it util to properly display error message
LakshSingla Feb 27, 2024
76bc024
new runner
LakshSingla Feb 28, 2024
349e8d9
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Feb 28, 2024
d26f7fc
comments, test cases
LakshSingla Feb 29, 2024
b3850df
Merge branch 'master' into merge-buffer-deadlock
LakshSingla Apr 9, 2024
8f255d7
checkstyle
LakshSingla Apr 9, 2024
6980d77
query resource id
LakshSingla Apr 9, 2024
e2da72a
review comments
LakshSingla Apr 10, 2024
9271043
comments, tests
LakshSingla Apr 12, 2024
b8eada8
review final
LakshSingla Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand Down Expand Up @@ -372,19 +373,21 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand Down Expand Up @@ -356,16 +357,18 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, config);
final GroupingEngine groupingEngine = new GroupingEngine(
processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
groupByResourcesReservationPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
return new GroupByQueryRunnerFactory(groupingEngine, toolChest);
}

Expand Down Expand Up @@ -469,7 +472,7 @@ private <T> List<T> runQuery()
toolChestWarehouse.getToolChest(query)
)
.applyPreMergeDecoration()
.mergeResults()
.mergeResults(true)
.applyPostMergeDecoration();

//noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -487,19 +488,21 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public void testGroupByWithDistinctCountAgg() throws Exception
Iterable<ResultRow> results = FluentQueryRunner
.create(factory.createRunner(incrementalIndexSegment), factory.getToolchest())
.applyPreMergeDecoration()
.mergeResults()
.mergeResults(true)
.applyPostMergeDecoration()
.run(QueryPlus.wrap(query))
.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();

List<ResultRow> expectedResults = Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,35 @@ public MaterializedViewQueryQueryToolChest(
{
this.warehouse = warehouse;
}

@Override
public QueryRunner mergeResults(QueryRunner runner)
{
return new QueryRunner() {
return new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
return warehouse.getToolChest(realQuery)
.mergeResults(runner)
.run(queryPlus.withQuery(realQuery), responseContext);
}
};
}

@Override
public QueryRunner mergeResults(QueryRunner runner, boolean willMergeRunner)
{
return new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
return warehouse.getToolChest(realQuery).mergeResults(runner).run(queryPlus.withQuery(realQuery), responseContext);
return warehouse.getToolChest(realQuery)
.mergeResults(runner, willMergeRunner)
.run(queryPlus.withQuery(realQuery), responseContext);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testDecorateObjectMapper() throws IOException
QueryToolChest queryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down Expand Up @@ -186,7 +186,7 @@ public void testDecorateObjectMapperMaterializedViewQuery() throws IOException
QueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down Expand Up @@ -245,7 +245,7 @@ public void testGetRealQuery()
MaterializedViewQueryQueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand All @@ -67,6 +68,9 @@ public class MapVirtualColumnGroupByTest extends InitializedNullHandlingTest
public void setup() throws IOException
{
final IncrementalIndex incrementalIndex = MapVirtualColumnTestBase.generateIndex();
final GroupByQueryConfig config = new GroupByQueryConfig();
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1), config);
final GroupingEngine groupingEngine = new GroupingEngine(
new DruidProcessingConfig()
{
Expand Down Expand Up @@ -94,17 +98,17 @@ public int getNumThreads()
return 1;
}
},
GroupByQueryConfig::new,
() -> config,
new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
);

runner = QueryRunnerTestHelper.makeQueryRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void testGroupByWithApproximateHistogramAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "approx-histo");
}

Expand Down Expand Up @@ -231,7 +232,8 @@ public void testGroupByWithSameNameComplexPostAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "approx-histo");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public void testGroupByWithFixedHistogramAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}

Expand Down Expand Up @@ -233,7 +234,8 @@ public void testGroupByWithSameNameComplexPostAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public static void retryUntil(

LOG.info(
"Attempt[%d/%d] did not pass: Task %s still not complete. Next retry in %d ms",
currentTry, retryCount, taskMessage, delayInMillis
currentTry,
retryCount,
taskMessage,
delayInMillis
);
try {
Thread.sleep(delayInMillis);
Expand All @@ -83,10 +86,10 @@ public static void retryUntil(
if (currentTry > retryCount) {
if (lastException != null) {
throw new ISE(
lastException,
"Max number of retries[%d] exceeded for Task[%s]. Failing.",
retryCount,
taskMessage,
lastException
taskMessage
);
} else {
throw new ISE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
}

public FluentQueryRunner<T> mergeResults()
/**
* Delegates to {@link QueryToolChest#mergeResults(QueryRunner, boolean)}.
*
* @see QueryToolChest#mergeResults(QueryRunner, boolean)
*/
public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc for what willMergeRunner means. I recognize most other stuff in here doesn't have javadocs, but, still.

{
return from(toolChest.mergeResults(baseRunner));
return from(toolChest.mergeResults(baseRunner, willMergeRunner));
}

public FluentQueryRunner<T> map(final Function<QueryRunner<T>, QueryRunner<T>> mapFn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation()
);
}

public QueryResourceId getQueryResourceId()
{
return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID));
}

public String getBrokerServiceName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation";

// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
public static final String QUERY_RESOURCE_ID = "queryResourceId";

// SQL query context keys
public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID;
Expand Down