Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foundations for an operator-based approach for Druid queries #12641

Closed
wants to merge 11 commits into from

Conversation

paul-rogers
Copy link
Contributor

@paul-rogers paul-rogers commented Jun 13, 2022

Issue #11933 proposed using the industry-standard operator DAG structure for Druid queries in place of the existing Sequence-based approach. The issue has a lengthy discussion of the reasons.

Separately, issue #12262 proposes a multi-stage query engine for Druid, focused on long-running report-style queries and ingestion. To extend that idea to the low-latency space would seem to demand we start with what we already have, and which has proven itself to be rock-solid in many production shops.

Putting the two together, to create a multi-stage solution for Druid's low-latency query path, we propose to evolve what we have, step-by-step, to the industry-standard operator DAG approach, which will allow us to introduce multi-stage queries within the existing framework.

This PR is a first step: it provides the foundation structure. The code here has already been used to create a full operator-based solution for scan queries in the context of the historical node and to fully convert the scan query path for the test query stack. That work will be contributed, step-by-step, building on top of this PR.

See the README for more details.

Operators

An operator does one task in a data pipeline. The key operator abstractions include:

  • Operator: an interface for a data pipeline component. An operator can be opened to provide an iterator over results, then closed. An operator can have zero inputs (a leaf operator), one input (a filter, limit or projection operator) or multiple inputs (join, merge, union, etc.)
  • RowIterator: a super-simple iterator over the rows which an operator produces. Uses an exception to signal EOF, which reduces the code needed in a data pipeline relative to the Java iterator protocol.

Multiple variations of operators are provided in this PR. All of these operators are simple in the sense that they only refer to other operators, but not to any of Druid's query infrastructure.

  • LimitOperator: applies a limit to a result set.
  • NullOperator: does nothing, like an empty list or empty iterator.
  • MappingOperator: takes one input and applies some form of mapping as defined by a derived class.
  • ConcatOpreator: performs a union of its inputs, emitting each one after the other.
  • OrderedMergeOperator implements an ordered merge of multiple inputs.
  • WrappingOperator similar to "baggage" on sequences: an operator that does tasks at the start and end, of result set, but imposes no per-row overhead.

Fragments

Operators combine to form a data pipeline. Data pipelines are distributed, as in Druid's scatter/gather architecture. A common terminology is to say that the entire query forms a DAG. The DAG is "sliced" at node boundaries, with exchanges between slices. At runtime, a slice is replicated across many nodes. Each instance of a slice is a fragment.

This PR provides the basics of the fragment structure. In most engines, a planner converts SQL into a logical plan, then into a physical plan that describes the operator DAG. Slices of that plan are sent to nodes which then execute the fragments. Druid, however, already has an existing QueryRunner based structure. QueryRunner are actually "query planners": the QueryRunner.run() method is better thought of as QueryPlanner.plan(): it figures out what sequence is needed at that point in the pipeline and creates that sequence.

Our first step in the path to adopt operators is to reuse the query runners. Instead of creating sequences, we modify QueryRunners to create operators. The fragment-related abstractions in this PR support such an approach.

  • FragmentContext: the state shared by all operators in a fragment. For now, this state includes the ResponseContext and, internally, the collection of all operators that form the fragment.
  • FragmentBuilder: creates a fragment from a collection of operators, and provides an API to run the resulting fragment.
  • FragmentRun: runs the fragment, which means calling open() on the root operator, returning the root operator's iterator, and closing all operators at the completion of the run.
  • FragmentBuilderFactory: a factory to create a fragment builder. This class will be injected via Guice.

We will need a way to pass the FragmentContext to QueryRunners so that they can create operators for a fragment. It turns out that QueryPlus is handy way to accomplish this, so this PR contains the required QueryPlus code. That code isn't used yet: we're just setting things up.

Starter Scan Query Operators

This PR has been marked as needing design review. To help with the review, this PR includes a “starter set” of scan query operators that illustrates how the mechanism works.

Bootstrap

The QueryNGModule defines a FragmentBuilderFactory which is injected into QueryLifecycleFactory. That factory then creates a QueryLifecycle. The query lifecycle checks if the query is enabled (as described above) using the config attached to FragmentBuilderFactory`. If so, it then uses FragmentBuilderFactory`` to create a FragmentBuilder which is then attached to the `QueryPlus` for that query.

From then on, each QueryRunner checks if there is a FragmentBuilder instance attached to the QueryPlus. If so, the query runner creates an operator (if that query runner has been converted to do so), else it executes the "classic" code path to create a sequence.

Note that, if the mechanism is not enabled, there will never be a FragmentBuilder attached to the QueryPlus, and so execution will ignore the operator path.

Operator and Sequence Interoperability

In this PR, a set of shims allows operators to read from sequences, and to masquerade as sequences. This allows operators to seamlessly insert themselves into a sequence-based execution pipeline.

When two operators are adjacent, the intervening sequence is optimized away, leaving the the two operators to talk directly.

As the conversion continues, there will be more operators and fewer in the execution pipeline.

Operator "Planning"

The QueryRunner.run() method can actually be seen as being a QueryPlanner.plan() method: the method decides which sequences are needed to run a query. The sequences do the actually running. In this light, it is easy to see how we convert to operators: the QueryRunner.run() method creates an operator instead.

Looking ahead, the "planning" code can be gathered up in a "native query planner". In anticipation, each converted QueryRunner calls to ScanQueryPlanner to decide which operator(s) to create. In some cases, the operator is a 1:1 replacement for a sequence. In other cases, it turns out we can optimize the query by omitting unneeded operators, or by using one of several finely tuned operators in place of the generalized sequence in the existing code.

The general pattern is:

  • Accept a QueryPlus and an upstream QueryRunner.
  • Create an operator which runs the QueryRunner.
  • Convert the resulting sequence to an "input operator."
  • Decide which operator(s) we need to add to perform the task at hand.
  • Wrap the result in a sequence compatible with the return value of QueryRunner.run().

Again, the code is such that the "shim" sequences and operators melt away when operators are adjacent.

Scan Operators

This PR includes the following scan query operators:

  • CursorReader reads from one or more Cursors.
  • ScanQueryOperator replaces the ScanQueryEngine, which pretty much reads from a set of cursors.
  • GroupedScanResultLimitOperator, UngroupedScanResultLimitOperator: limit operators for scans.
  • ScanResultOffsetOperator: offset operator for scans.
  • ScanListToArrayOperator, ScanCompactListToArrayOperator: unpacks scan "batches" to individual rows.

Surprisingly, these are the only scan-specific operators required. A later PR will include the set of generic operators needed to completely replace sequences in the scan query path, at least for tests.

Configuration

At present, the operator path is experimental, and thus disabled by default. The operator path is enabled in this PR only for test queries, and only using the flag described below. The mechanism is completely disabled for production code. We'll enable it later after we convert more operators.

For test queries, the mechanism is enabled (only for tests) by setting a system property: -druid.queryng.enabled=true. This is a temporary approach, good enough for testing. Tests use the "classic" mechanism unless this flag is set for the test. Even with enabled with the flag, the mechanism is enabled only for scan queries; disabled for all other query types. Later PRs will widen the path.

A later PR will provide a config variable to enable the operator path in production code. At that time, if the druid.queryng.enable config property is set to true, then the mechanism is optional. By default, the mechanism also requires that the queryng variable be set to true in the query context. A second config variable, druid.queryng.requireContext can be set to false to enable use of the mechanism without setting a per-query context variable, which is handy for running tests.

Tests

One of the very handy things about operators is that they are highly modular and thus extremely easy to unit test. Tests exist for all the basic abstractions defined above. Further, all SQL CalciteQueryTest queries were run with the mechanism enabled.

Next Steps

The goal of this PR is for reviewers to focus on the core abstractions. The next PR will begin to create the parallel operator path for scan queries. Those PRs will provide operators converted from the existing sequences, along with the "planner" code that query runners use to define the operator. That whole path an be seen in this branch.


This PR has:

  • been self-reviewed.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster. (N/A, since the code is not yet integrated into Druid.)

@lgtm-com
Copy link

lgtm-com bot commented Jun 16, 2022

This pull request introduces 1 alert when merging 66c416c into 94564b6 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Jun 16, 2022

This pull request introduces 1 alert when merging c2794e5 into f050069 - view on LGTM.com

new alerts:

  • 1 for Useless null check

@lgtm-com
Copy link

lgtm-com bot commented Jun 17, 2022

This pull request introduces 1 alert when merging 7aa2d11 into 893759d - view on LGTM.com

new alerts:

  • 1 for Useless null check

@paul-rogers
Copy link
Contributor Author

Clean build except for a flaky IT. Ready for review.

@paul-rogers
Copy link
Contributor Author

Since this PR has been marked as requiring design review, it seemed worthwhile to expand the scope a bit to include some actual operators and query integration. See the main description for the set of "starter" scan query operators now included in the PR.

Extended ScanQueryRunnerTest to run with operators.
Fixed a few bugs and build issues.
@paul-rogers
Copy link
Contributor Author

Build is clean despite failures. Looks like all Java 15 stages failed to even start running. They are not relevant to this work and can be ignored.

@paul-rogers
Copy link
Contributor Author

Seems to be too big a shift for now. Will continue on a private branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants