Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial join support #8728

gianm opened this issue Oct 24, 2019 · 8 comments

Initial join support #8728

gianm opened this issue Oct 24, 2019 · 8 comments


Copy link

@gianm gianm commented Oct 24, 2019


Druid aims to be a more powerful analytical database, and implementing joins is a very common ask from the user community. Druid does support some related features today:

  • Lookups enable certain star-schema use cases, and are similar to joins in that regard. But they are limited in that they can only support LEFT joins of fact-to-dimension (not right/inner/outer) and that they cannot support more than one value per key.
  • Druid SQL supports semijoins through both the WHERE x IN (SELECT ...) syntax and traditional JOIN syntax. But it only supports one subquery per SQL query, and does not support negation ("NOT IN").

Real JOIN support would be more powerful, enabling even more star-schema and subquery-based use cases:

  • More flexibility for join types (LEFT/RIGHT/INNER/FULL OUTER).
  • More flexibility for join conditions.
  • Joining against more than one subquery at once.
  • Easier integration with third-party applications that generate JOIN queries for star schemas.
  • Open the door for joining two datasources together.

Proposed changes

The idea is to add a "join" datasource, expose it through SQL, and add machinery to brokers and data servers to allow hash-based equijoins of zero or one "table" datasource and any number of "lookup", "inline", or "query" datasources. As a side effect, these proposed changes will add the ability to query lookups directly, using a datasource of type "lookup".

I think this subset of functionality is a good start, because it adds meaningful new capabilities and helps unify existing ones. Things that are not in scope for this initial proposal include: joining two "table" datasources to each other, non-equijoins, non-hash-based joins. I think these should all be implemented at some point, as future work.

There are four main areas to this proposal,

  1. SQL: add rules to plan joins, adding "lookup" schema.
  2. Native queries: add "join", "lookup", and "inline" datasources.
  3. Broker: must be able to translate anything that data servers can’t do into something they can do (e.g. evaluate a subquery and replace it with an "inline" datasource). Must also be able to fully evaluate queries that only use local datasource like "lookup" and "inline".
  4. Data servers (historical, etc): must be able to evaluate joins of one table onto any number of lookup or inline datasources, or queries on top of those.

The next few sections expand on these areas.


  1. Add lookups to SQL in a "lookup" schema.
  2. Allow any equi-joins that involve zero or one normal datasource and any number of lookups or subqueries, through a new join-planning system.
  3. Remove semi-join specific planning code, since that should now be handled by the generic join planning code in (2) above.

An example SQL query might be:

SELECT products.value AS product_name, SUM(sales.revenue)
FROM sales
LEFT JOIN lookup.products ON sales.product_id = products.key
GROUP BY products.value

This query takes advantage of the fact that unqualified tables like sales are assumed to be normal datasources. Lookups are referenced as part of the lookup schema, like lookup.products.

Multiple join queries can be specified per SQL query. We will need to guide Calcite’s cost-based optimizer towards reordering them optimally. This may require more statistics than we currently possess, so adding these and improving join plans may end up being an area of future work.

Native queries

  1. Add a "join" datasource type that represents a join of two other datasources. It’d include a left datasource, right datasource, condition (which I am thinking will be restricted to equality at first), and type (left, right, full outer, inner).
  2. Add "lookup" and "inline" datasources to provide things to join onto. These can be specified as inputs to a join, or they can be directly queried (a new capability for lookups!)’
  3. Allow joining on to "query" datasources as well. To make this work, we’ll need to add a sense of a ‘standard translation’ of results from certain query types into flat schemas that we can offer column selectors on top of. There may be more than one way to do this, since certain query types (notably, topN and scan) return nested results in some cases. We could do this by adding a new QueryToolChest method.

The rows coming out of a join datasource would be the result of the join. Any query type could use a join datasource without being aware of the fact that joins exist.

Join datasources can be nested within each other. Unlike SQL, native query evaluation will not reorder joins. It will execute them in the order that the join tree is provided.

Probably will not allow joining on "table" datasources, except as the extreme left-hand side of a join.

In order to protect against column name ambiguity (what if the left and right side have a column of the same name?), I propose adding a "rightPrefix" parameter to the join datasource. This would be prefixed to every column name coming from the right side, and should be chosen by the caller to be something that won’t conflict with any left-side columns. Druid SQL will choose one automatically when planning a SQL join.

The join datasource used by the earlier SQL query, above, would be:

"dataSource": {
  "type": "join",
  "left": "sales",
  "right": {
    "type": "lookup",
    "lookupName": "products"
  "rightPrefix": "foobar",
  "joinType": "left",
  "condition": {
    "leftColumn": "product_id",
    "rightColumn": "key"
    "rightPrefix": "something"

Broker behavior

The following technique should be implemented by CachingClusteredClient. The idea is to either evaluate the query locally, or else get it into a format that data servers can handle (see next section).

  1. Analyze the datasource to find all tables and subqueries on tables.
  2. Validate that there is at most one table datasource that is not wrapped in a subquery, and if there is one, it is in the leftmost leaf position. If this validation fails, return an error.
  3. Evaluate all subqueries on tables ("query" datasource with "table" child), and transform the master datasource by replacing those subquery datasources with "inline" datasources. Do not do this on "table" datasources that are not wrapped in a query. Keep a running counter of how many rows have been materialized in this way, and return an error if it grows too large.
  4. If there is a table in the leftmost leaf position, send the query down to data servers without further modifications beyond those done in step (3). Use the single table datasource to determine which data servers and which segments to query.
  5. If there are no table datasources remaining after the transformations in (3), evaluate the query locally, on the Broker. Creating a virtual Segment on top of the local data, and run the appropriate query engine on that Segment.

Data server behavior

The following technique should be implemented by ServerManager (historical) and SinkQuerySegmentWalker (indexer) when a join datasource is encountered.

  1. Analyze the join datasource to find the "primary" datasource: the leftmost leaf datasource. This must be a table datasource, and will be the base for the join. (Note that we are assuming here that the Broker would only send down the query if there were exactly one table.)
    Enumerate all other leaf datasources in depth-first pre-order. We will apply join clauses in this order.
  2. Create a hash table for each non-primary leaf datasource, unless it already exists (e.g. no need to create a hash table for a lookup, but we might create one for a query on top of a lookup). If this is impossible for any datasource, like if one of them is a regular "table" datasource, return an error.
  3. Create a virtual Segment for each Segment of the primary datasource that wraps up all the joining work, and returns a StorageAdapater + ColumnSelectorFactory representing the results of the join. Pass this to query engines as normal.


Some other alternatives considered were:

  • A "join" query type. @jihoonson did work towards this in #4118, but the patch was not merged. Reconsidering this design now, I think a "join" datasource would work better than a query type, because it composes more nicely. We can expose the results of a join datasource to query engines the same way as we expose a table (via Segment + StorageAdapter). It is naturally lazy (no need to compute columns the query won’t use) and will work with all existing query engines without a need to modify them.
  • Joining onto broadcast datasources instead of lookups. I think we should do both over time, but chose to start with lookups because their usage is more prevalent in Druid today. I am hoping to allow people to use this popular feature through a more familiar (and powerful!) SQL syntax.

Operational impact

None expected.

Test plan

Add lots of unit tests in the druid-sql and druid-processing modules.

Future work

Out of scope for this proposal, but would be nice in the future:

  • Adding support for non-equijoins.
  • Adding support for joining onto broadcast datasources, not just lookups.
  • Adding support for joining two distributed table datasources. (This one gets complex quickly and there are a lot of sub-cases we want to consider.)
  • Adding a mechanism for caching the results of a subquery as a prebuilt hash-joinable table. Maybe manually or maybe automatically.

This comment has been minimized.

Copy link
Contributor Author

@gianm gianm commented Oct 24, 2019

I am planning to do work towards implementing this proposal over the next few months. Feedback is very welcome.


This comment has been minimized.

Copy link

@vogievetsky vogievetsky commented Oct 24, 2019

I know everyone will be very excited about JOINs but the proposal to have lookups queryable via SQL will enable amazing stuff in the web console!


This comment has been minimized.

Copy link

@kstrempel kstrempel commented Oct 25, 2019

Yes that is an exciting direction, but wouldn't it be more efficient to develop a druid connector for presto?


This comment has been minimized.

Copy link

@asdf2014 asdf2014 commented Oct 25, 2019

Hi, @kstrempel . IMO, I don't think this is a problem on the same level. Presto does provide more richer SQL queries, but it does not store data itself, so there will be a process to ingest data from Druid, which will inevitably have some performance loss. And if Presto pushes most of the calculations down to Druid, then Druid still needs to have more SQL query capabilities, which is what the proposal is doing. I think this may be one of the reasons why Spark does not directly borrow the SQL features of Impala or Presto, but implement Spark SQL on its own.


This comment has been minimized.

Copy link
Contributor Author

@gianm gianm commented Oct 25, 2019

Yes that is an exciting direction, but wouldn't it be more efficient to develop a druid connector for presto?

I think a Druid connector for Presto would help get Druid data into Presto and help support 100% of SQL use cases, so it's interesting. But as @asdf2014 mentioned, doing what we can in Druid directly should mean better performance.


This comment has been minimized.

Copy link

@mbathula mbathula commented Oct 25, 2019

@gianm that's an exciting announcement and looking forward to it.


This comment has been minimized.

Copy link

@sanjaythkr7 sanjaythkr7 commented Oct 28, 2019

Great to know this news! 👍 can't wait to see this in releases!

@gianm gianm mentioned this issue Oct 31, 2019
3 of 9 tasks complete

This comment has been minimized.

Copy link

@palanieppan-m palanieppan-m commented Nov 8, 2019

@kstrempel we looked into join support through presto couple years ago.

Its possible but presto connector interface did not have API to push down predicates. So Druid would have to forward raw non aggregated data to presto and presto would do all query processing, which compromises all the performance benefits of Druid.

Great to see this initiative.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
7 participants
You can’t perform that action at this time.