Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial join support #8728

Open
gianm opened this issue Oct 24, 2019 · 15 comments
Open

Initial join support #8728

gianm opened this issue Oct 24, 2019 · 15 comments

Comments

@gianm
Copy link
Contributor

gianm commented Oct 24, 2019

Motivation

Druid aims to be a more powerful analytical database, and implementing joins is a very common ask from the user community. Druid does support some related features today:

  • Lookups enable certain star-schema use cases, and are similar to joins in that regard. But they are limited in that they can only support LEFT joins of fact-to-dimension (not right/inner/outer) and that they cannot support more than one value per key.
  • Druid SQL supports semijoins through both the WHERE x IN (SELECT ...) syntax and traditional JOIN syntax. But it only supports one subquery per SQL query, and does not support negation ("NOT IN").

Real JOIN support would be more powerful, enabling even more star-schema and subquery-based use cases:

  • More flexibility for join types (LEFT/RIGHT/INNER/FULL OUTER).
  • More flexibility for join conditions.
  • Joining against more than one subquery at once.
  • Easier integration with third-party applications that generate JOIN queries for star schemas.
  • Open the door for joining two datasources together.

Proposed changes

The idea is to add a "join" datasource, expose it through SQL, and add machinery to brokers and data servers to allow hash-based equijoins of zero or one "table" datasource and any number of "lookup", "inline", or "query" datasources. As a side effect, these proposed changes will add the ability to query lookups directly, using a datasource of type "lookup".

I think this subset of functionality is a good start, because it adds meaningful new capabilities and helps unify existing ones. Things that are not in scope for this initial proposal include: joining two "table" datasources to each other, non-equijoins, non-hash-based joins. I think these should all be implemented at some point, as future work.

There are four main areas to this proposal,

  1. SQL: add rules to plan joins, adding "lookup" schema.
  2. Native queries: add "join", "lookup", and "inline" datasources.
  3. Broker: must be able to translate anything that data servers can’t do into something they can do (e.g. evaluate a subquery and replace it with an "inline" datasource). Must also be able to fully evaluate queries that only use local datasource like "lookup" and "inline".
  4. Data servers (historical, etc): must be able to evaluate joins of one table onto any number of lookup or inline datasources, or queries on top of those.

The next few sections expand on these areas.

See also https://gist.github.com/gianm/39548daef74f0373b3c87056e3db4627 for an (incomplete) implementation oriented doc.

SQL

  1. Add lookups to SQL in a "lookup" schema.
  2. Allow any equi-joins that involve zero or one normal datasource and any number of lookups or subqueries, through a new join-planning system.
  3. Remove semi-join specific planning code, since that should now be handled by the generic join planning code in (2) above.

An example SQL query might be:

SELECT products.value AS product_name, SUM(sales.revenue)
FROM sales
LEFT JOIN lookup.products ON sales.product_id = products.key
GROUP BY products.value

This query takes advantage of the fact that unqualified tables like sales are assumed to be normal datasources. Lookups are referenced as part of the lookup schema, like lookup.products.

Multiple join queries can be specified per SQL query. We will need to guide Calcite’s cost-based optimizer towards reordering them optimally. This may require more statistics than we currently possess, so adding these and improving join plans may end up being an area of future work.

Native queries

  1. Add a "join" datasource type that represents a join of two other datasources. It’d include a left datasource, right datasource, condition (which I am thinking will be restricted to equality at first), and type (left, right, full outer, inner).
  2. Add "lookup" and "inline" datasources to provide things to join onto. These can be specified as inputs to a join, or they can be directly queried (a new capability for lookups!)’
  3. Allow joining on to "query" datasources as well. To make this work, we’ll need to add a sense of a ‘standard translation’ of results from certain query types into flat schemas that we can offer column selectors on top of. There may be more than one way to do this, since certain query types (notably, topN and scan) return nested results in some cases. We could do this by adding a new QueryToolChest method.

The rows coming out of a join datasource would be the result of the join. Any query type could use a join datasource without being aware of the fact that joins exist.

Join datasources can be nested within each other. Unlike SQL, native query evaluation will not reorder joins. It will execute them in the order that the join tree is provided.

Probably will not allow joining on "table" datasources, except as the extreme left-hand side of a join.

In order to protect against column name ambiguity (what if the left and right side have a column of the same name?), I propose adding a "rightPrefix" parameter to the join datasource. This would be prefixed to every column name coming from the right side, and should be chosen by the caller to be something that won’t conflict with any left-side columns. Druid SQL will choose one automatically when planning a SQL join.

The join datasource used by the earlier SQL query, above, would be:

"dataSource": {
  "type": "join",
  "left": "sales",
  "right": {
    "type": "lookup",
    "lookupName": "products"
  },
  "rightPrefix": "foobar",
  "joinType": "left",
  "condition": {
    "leftColumn": "product_id",
    "rightColumn": "key"
    "rightPrefix": "something"
  }
}

Broker behavior

The following technique should be implemented by CachingClusteredClient. The idea is to either evaluate the query locally, or else get it into a format that data servers can handle (see next section).

  1. Analyze the datasource to find all tables and subqueries on tables.
  2. Validate that there is at most one table datasource that is not wrapped in a subquery, and if there is one, it is in the leftmost leaf position. If this validation fails, return an error.
  3. Evaluate all subqueries on tables ("query" datasource with "table" child), and transform the originally-provided datasource by replacing those subquery datasources with "inline" datasources. Do not do this on "table" datasources that are not wrapped in a query. Keep a running counter of how many rows have been materialized in this way, and return an error if it grows too large.
  4. If there is a table in the leftmost leaf position, send the query down to data servers without further modifications beyond those done in step (3). Use the single table datasource to determine which data servers and which segments to query.
  5. If there are no table datasources remaining after the transformations in (3), evaluate the query locally, on the Broker. Creating a virtual Segment on top of the local data, and run the appropriate query engine on that Segment.

Data server behavior

The following technique should be implemented by ServerManager (historical) and SinkQuerySegmentWalker (indexer) when a join datasource is encountered.

  1. Analyze the join datasource to find the "primary" datasource: the leftmost leaf datasource. This must be a table datasource, and will be the base for the join. (Note that we are assuming here that the Broker would only send down the query if there were exactly one table.)
    Enumerate all other leaf datasources in depth-first pre-order. We will apply join clauses in this order.
  2. Create a hash table for each non-primary leaf datasource, unless it already exists (e.g. no need to create a hash table for a lookup, but we might create one for a query on top of a lookup). If this is impossible for any datasource, like if one of them is a regular "table" datasource, return an error.
  3. Create a virtual Segment for each Segment of the primary datasource that wraps up all the joining work, and returns a StorageAdapater + ColumnSelectorFactory representing the results of the join. Pass this to query engines as normal.

Rationale

Some other alternatives considered were:

  • A "join" query type. @jihoonson did work towards this in Add JoinQuery #4118, but the patch was not merged. Reconsidering this design now, I think a "join" datasource would work better than a query type, because it composes more nicely. We can expose the results of a join datasource to query engines the same way as we expose a table (via Segment + StorageAdapter). It is naturally lazy (no need to compute columns the query won’t use) and will work with all existing query engines without a need to modify them.
  • Joining onto broadcast datasources instead of lookups. I think we should do both over time, but chose to start with lookups because their usage is more prevalent in Druid today. I am hoping to allow people to use this popular feature through a more familiar (and powerful!) SQL syntax.

Operational impact

None expected.

Test plan

Add lots of unit tests in the druid-sql and druid-processing modules.

Future work

Out of scope for this proposal, but would be nice in the future:

  • Adding support for non-equijoins.
  • Adding support for joining onto broadcast datasources, not just lookups.
  • Adding support for joining two distributed table datasources. (This one gets complex quickly and there are a lot of sub-cases we want to consider.)
  • Adding a mechanism for caching the results of a subquery as a prebuilt hash-joinable table. Maybe manually or maybe automatically.
@gianm
Copy link
Contributor Author

gianm commented Oct 24, 2019

I am planning to do work towards implementing this proposal over the next few months. Feedback is very welcome.

@vogievetsky
Copy link
Contributor

I know everyone will be very excited about JOINs but the proposal to have lookups queryable via SQL will enable amazing stuff in the web console!

@kstrempel
Copy link

Yes that is an exciting direction, but wouldn't it be more efficient to develop a druid connector for presto?

@asdf2014
Copy link
Member

Hi, @kstrempel . IMO, I don't think this is a problem on the same level. Presto does provide more richer SQL queries, but it does not store data itself, so there will be a process to ingest data from Druid, which will inevitably have some performance loss. And if Presto pushes most of the calculations down to Druid, then Druid still needs to have more SQL query capabilities, which is what the proposal is doing. I think this may be one of the reasons why Spark does not directly borrow the SQL features of Impala or Presto, but implement Spark SQL on its own.

@gianm
Copy link
Contributor Author

gianm commented Oct 25, 2019

Yes that is an exciting direction, but wouldn't it be more efficient to develop a druid connector for presto?

I think a Druid connector for Presto would help get Druid data into Presto and help support 100% of SQL use cases, so it's interesting. But as @asdf2014 mentioned, doing what we can in Druid directly should mean better performance.

@mbathula
Copy link

@gianm that's an exciting announcement and looking forward to it.

@sanjayk9r
Copy link

Great to know this news! 👍 can't wait to see this in releases!

@palanieppan-m
Copy link
Contributor

@kstrempel we looked into join support through presto couple years ago.

Its possible but presto connector interface did not have API to push down predicates. So Druid would have to forward raw non aggregated data to presto and presto would do all query processing, which compromises all the performance benefits of Druid.

Great to see this initiative.

gianm added a commit to gianm/druid that referenced this issue Dec 30, 2019
An initial step towards apache#8728. This patch adds enough functionality to implement a joining
cursor on top of a normal datasource. It does not include enough to actually do a query. For
that, future patches will need to wire this low-level functionality into the query language.
@gianm
Copy link
Contributor Author

gianm commented Dec 30, 2019

FYI — I started a design document for implementation at https://gist.github.com/gianm/39548daef74f0373b3c87056e3db4627. I didn't put it into this issue directly, since I think a proposal and design doc have different goals. (The proposal is meant to describe a program for implementing the feature, motivations, considerations, etc; but the design doc is meant to describe what's already implemented.)

@xlw712
Copy link

xlw712 commented Jan 8, 2020

This is a good idea !I look forward to

@jnaous
Copy link
Contributor

jnaous commented Jan 10, 2020

  • Awesome sauce!
  • I think what's missing here as well is a bit of discussion on the ingestion side of dimension tables.
  • You mention "master datasource" under Broker Behavior. First time it's mentioned in this doc. With no prior explanation. Do you mean the base datasource?
  • Under Test Plan it would be good to see some more comprehensive sets of queries that you expect to be supported and others that won't be, ideally in order of complexity. Perhaps we can use that to stage implementation phases by scoping them to supported query types?

gianm added a commit that referenced this issue Jan 16, 2020
* Add HashJoinSegment, a virtual segment for joins.

An initial step towards #8728. This patch adds enough functionality to implement a joining
cursor on top of a normal datasource. It does not include enough to actually do a query. For
that, future patches will need to wire this low-level functionality into the query language.

* Fixups.

* Fix missing format argument.

* Various tests and minor improvements.

* Changes.

* Remove or add tests for unused stuff.

* Fix up package locations.
@gianm
Copy link
Contributor Author

gianm commented Jan 20, 2020

@jnaous,

  • I think what's missing here as well is a bit of discussion on the ingestion side of dimension tables.

I agree. I haven't thought a ton about how it should work. But I think, ideally, it'd use the new InputFormat and InputSource stuff so users don't have to learn new concepts. It would be great if it used tasks too for even more conceptual-weight-reduction.

  • You mention "master datasource" under Broker Behavior. First time it's mentioned in this doc. With no prior explanation. Do you mean the base datasource?

I meant the datasource that the user provided in the query. I'll edit to make it clearer.

  • Under Test Plan it would be good to see some more comprehensive sets of queries that you expect to be supported and others that won't be, ideally in order of complexity. Perhaps we can use that to stage implementation phases by scoping them to supported query types?

That is a great idea. I will plan to add that when the SQL part starts getting fleshed out more.

gianm added a commit to gianm/druid that referenced this issue Jan 21, 2020
Builds on apache#9111 and implements the datasource analysis mentioned in apache#8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.
3) Use the DataSourceAnalysis methods throughout the query handling stack, replacing
   various ad-hoc approaches. Most of the interesting changes are in
   ClientQuerySegmentWalker (brokers), ServerManager (historicals), and
   SinkQuerySegmentWalker (indexing tasks).

Other notes:

1) Changed TimelineServerView to return an Optional timeline, which I thought made
   the analysis changes cleaner to implement.
2) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
3) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to
   determine whether it is safe to pass a subquery dataSource to the query toolchest.
   Fixes an issue introduced in apache#5471 where subqueries under non-groupBy-typed queries
   were silently ignored, since neither the query entry point nor the toolchest did
   anything special with them.
4) The addition of "isCacheable" should work around apache#8713, since UnionDataSource now
   returns false for cacheability.
gianm added a commit to gianm/druid that referenced this issue Jan 21, 2020
Builds on apache#9111 and implements the datasource analysis mentioned in apache#8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.

Other notes:

1) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
2) The addition of "isCacheable" should work around apache#8713, since UnionDataSource now
   returns false for cacheability.
gianm added a commit that referenced this issue Jan 22, 2020
* Add join-related DataSource types, and analysis functionality.

Builds on #9111 and implements the datasource analysis mentioned in #8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.

Other notes:

1) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
2) The addition of "isCacheable" should work around #8713, since UnionDataSource now
   returns false for cacheability.

* Remove javadoc comment.

* Updates reflecting code review.

* Add comments.

* Add more comments.
@himanshug
Copy link
Contributor

@gianm
This sounds great and thanks for writing https://gist.github.com/gianm/39548daef74f0373b3c87056e3db4627 . While I am reading through everything trying to understand details, I am realizing it would be useful to have a doc describing current limitations of this work (e.g. Druid table joins requiring shuffle not supported yet) and examples of SQL and Druid Native queries that showcase use cases which were not possible before but are supported now, or older way was not efficient/user-friendly and new way of writing that query makes things efficient/user-friendly .

@chetansurwade
Copy link

@gianm When can we expect a release with Join Support and ANSI SQL compliance?

@bobtiernay-okta
Copy link

Very cool stuff. Curious if this is still actively being worked on. I also wonder if the Imply implementation has been looked at and considered. They may have some findings that would inform the implementation:

https://imply.io/post/apache-druid-joins

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests