Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull Query: Use logical and physical query planners for pull queries #3541

Closed
big-andy-coates opened this issue Oct 11, 2019 · 5 comments
Closed
Labels

Comments

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Oct 11, 2019

At the moment the pull query code is 'bolted on' to KSQL. As we know, this was done to meet deadlines and avoid collision with others restructuring the core engine at the time. However, before moving much further forward with pull queries we should look to integrate them with KSQLs query analysis and execution model.

Assumptions

There is nothing inherently different with how a pull query vs a push query should be analyzed, planned or executed. The current limitation of pull queries only working with aggregate tables is just that: a current limitation.

The main difference between pull and push queries is that the former wants to calculate a single result and return it to the user, akin to a query in a traditional rdbs, where as the latter is about subscribing to the changes to the result over time. Both of these query types might be implemented by reading all the source data in Kafka, or by making use of data already materialized in to state stores in KSQL.

At the moment, push queries can't make use of existing state stores and pull queries can't make use of data in Kafka, but that won't continue to be the case as KSQL evolves.

Current design

Push queries

Push queries, including those used to create materialized views in C*AS statements, are handled by the KsqlEngine and go through these rough stages:

  1. Query analysis: validates the query and captures pertinent information from the query and its data sources.
  2. Logical Planning: A logical plan is built made up of nodes such as DataSourceNode, RepartitionNode, ProjectNode etc.
  3. Physical Planning: The logical plan is broken down into the physical steps that need to happen to the data, e.g. a ReportitionNode is converted to a step to select certain columns as the key columns; a AggregateNode may have a steps to select a new key, group by that key and aggregate other values, and a final projection to pull out the results the user wants. etc.
  4. Implementation: The physical plan is converted into an implementation, i.e. its converted into a Kafka Streams topology.

While there may be ways to improve these stages, such changes are outside the scope of this issue.

Pull queries

Pull queries are NOT handled by the KsqlEngine, they are handled by the PullQueryExecutor hacked together class. It only supports key based lookup at the moment. It go through these rough stages:

  1. Query analysis: uses the same code as push queries, with a different validator to restrict what is possible for pull queries. (Yay!)
  2. Locate materialization: the executor inspects the engine state to work out if the table in question is materialized in a KS state-store and fails if it isn't.
  3. Locates which node the data is on and proxies the call to that node.
  4. Key lookup on state-store. Note: the data in the statestore is in an intermediate state. It is not equivalent to the associated sink topic. Specifically, it:
    1. contains UDAF intermediate types, e.g. an AVG UDF might store a STRUCT<SUM BIGINT, COUNT BIGINT> as its intermediate type, which it then uses to map to a final BIGINT result.
    2. contains records that may not pass the HAVING clause of associated query, e.g. say query has .. HAVING COUNT() > 10;. The statestore will include all rows, regardless of the value of COUNT(). This is because we need to store the count somewhere to know when its above/below the threshold, and it's the state store that does this.
    3. uses a internal schema, i.e. columns have names such as KSQL_COL_0
  5. Applies transforms to any result to account for the discrepancies between the state store and the table it materializes, as given above. Specifically, it:
    1. applies the UDAF map method to get the result type of any UDAFs.
    2. applies a filter to handle any HAVING clause
    3. applies a select to convert from the internal schema to the schema of the table, i.e. convert column names like KSQL_COL_0 to COUNT or whatever.
  6. Applies a transformation to handle the projection in the query i.e. to select only the columns the user is interested in.

Shared code

The two query types share code only for the query analysis stage and for the specific transformations applied, i.e. the HAVING filter, the SELECT mapper and the conversion to UDAF result types.

Limitations

The limitations of the current pull query implementations are fairly well known, but for reference:

  1. Only tables build by a CTAS statement that includes a GROUP BY clause are supported
  2. Only simply queries supported: JOIN, PARTITION BY, GROUP BY and HAVING are not.
  3. WHERE clause limited to ROwKEY and WINDOWSTART.
  4. LIMIT is not supported.

Proposal

The code paths for the two query types should be unified so that they follow a common set of stages and common code. Pull queries should be executed by the KsqlEngine and not some custom class.

Data source stores

We propose the engine needs to track where a data source is stored, i.e. where a copy of a table or stream, (in current terminology), is stored.

At the moment, the metastore tracks which topic a data source is stored in. This is one store type. However, another store type is the state-store(s) KSQL may have created to materialize the data in the topic. The engine should track these stores too.

The store information the engine tracks will, obviously, need to differentiation the different types of store. We propose the following initial types:

  1. A kafka topic
  2. A sink kv state-store: this is a state-store used today for building aggregate tables and for serving pull queries. It is the state store just before the sink topic.
  3. A source kv state-store: this is where a data source's topic is materialized into a state-store for use as a source in another query.

With the current design there will be one 'kafka topic' per data source, zero or one 'sink state-store', (only CTAS with GROUP BY clauses), and any number of 'source state-store's, as each time data source created by a CTAS is used in a new query it is materialized into a new state-store, (yes, this isn't great!).

Initially, we propose only tracking state-stores created by persistent queries.

Design point: The engine will track the set of stores where each known data source is stored. Each store has a type: kafka topic; source state-store or sink state-store.

Store feature sets

We propose that each store type will define the set of features/functionality it supports. Initially, this set of supported features only needs to be able to differentiate between stores that:

  • support subscribing to changes, i.e. as is needed for push queries
  • support key based look up, i.e. as is need for pull queries

Plus maybe some other things that are needed to reject as-yet-unsupported pull query types.

In time, this set of features can be extended, with the potential for this to grow to drive cost-based optimization in the future.

Design point: stores, or more likely store types, will define the set of features/functionality they support.

Query planning

We propose that pull and push queries should go through the same logical and physical planning stages, though obviously with different validation rules to allow each to support different features.

Two queries, that only differ in that one is push and one is pull, e.g. SELECT * FROM X; vs SELECT * FROM X EMIT CHANGES should result in the same logical plan.

Design point: Push and pull queries will build logical query plans.

We propose introducing the a query optimizer that will choose what physical plan to build from the logical plan. Initially, this will be very dumb, as there is no overlap between push and pull implementation.

The optimizer will look at what stores are available for the source(s) used by the query and the and the features these stores support. It will exclude any stores that don't support the features the query needs. If this excludes all stores, then the query is rejected as not being supported yet.

Design point: the optimizer will choose which physical plan to build, based on the features the query requires and the features the source's stores support.

(NB: the 'optimizer' is not yet doing any optimization as there is no choice to be made between different stores. However, in till it will/may. We can, for now, just call it the physical planner... naming, hey).

Initially, any pull query where the source is not materialized in a sink state-store will be rejected, as it is today, and push queries will only make use of Kafka topics as they do today. In the future, we may be able to be more smart about which is the most correct/efficient materialization to use.

The initial choice of plans, which maintains current functionality, being:

  • One that uses a Kafka topic as the source(s), as is built today and which will result in a KS topology.
  • One that uses a sink state-store as the source(s), and which will include the steps to handle both the transformations needed to convert the state store data into the table data, (i.e. the UDAF map call, the HAVING filter, etc) and any steps required to handle the parts of the query itself. Initially this will just be the WHERE and SELECT clauses that are currently supported.

Scope for future improvements

In no particular order, this design should enable future functionality like the following:

1. Pull queries against non-GROUP BY CTAS tables

The current design only allows pull-query access to sink state-stores, i.e. those created by a GROUP BY clause in a CTAS statement. If we also track source state-stores, i.e. those created each time a table is used as a source in a downstream query, then we could support pull queries against tables imported with CT statements or created by CTAS statements that don't include a GROUP BY.

The difficulty here is that there may be multiple source state stores, running with different amounts of lag, so choosing which state store to use is not trivial. Or, if the table has not been used as a source in another query, there may be no source state stores.

One possible solution would be to allow users to choose to materialize such tables into state stores through some language construct, (meh). Another would be for KSQL to capture, (and persist), metrics around tables being accessed and use this to decide to materialize the table into a state store outside of any user supplied statement. If we fix the duplicate source state store issue (See point 5 below), then we could simply always materialize a table.

2. Quicker pull query execution

Pull queries will be executed by first building a logical and physical query plan. It would not be much work to cache a pull query's plan and/or implementation, so that repeated pull query calls can avoid these steps, (i.e. it will help to implement #3547).

3. Extending KSQL to support other store types

We can extend the set of store types in the future to include other types of state stores, or stores outside of KSQL, and use a (cost-based) optimizer to choose the best match for a specific query.

4. Arbitrary pull query support

#3546

We can extend this model to support pull queries that use KS topologies where we don't yet have the functionality to support a specific pull query against state stores.

5. Remove duplication of source state stores

In the longer term, we may also be able to use state stores as a source of data for push queries using KS topologies, though this is more challenging as it by passes KS's best-efforts at synchronising event time between source topics.

This would remove the duplicate state stores created each time a new push query starts that uses the same source table.

6. Feedback on unimplemented features

The new optimizer knows when a user executes a (pull) query that can not yet be handled. We could chose to somehow report this information back. Providing more data to prioritize work.

Conclusion

If we implement this propose followed by supporting arbitrary pull queries via KS topologies we remove most of the limitations mentioned above, and repeated below. Though they will functionally work, their latency will be high... very high! However, this will open up pull queries for use in data discovery / analytics type jobs. People are happy to wait 30 minutes while their Spark SQL job runs and may be happy to wait for their KSQL query to run for a similar amount of time. Not every use-case is latency sensitive.

We can then get to the job of increasing the set of features we support for pull queries, either by adding more features to, or around, the existing kv stores, or using other store types. This will allow us to bring down the cost of running certain types of queries.

Let's recap the limitations of current pull queries and see how this proposal can help:

  1. Only tables build by a CTAS statement that includes a GROUP BY clause are supported
    • Removed initially by supporting arbitrary pull queries using KS topologies, (future work no. 4), and later by supporting querying source state stores, (future work no. 1).
  2. Only simply queries supported: JOIN, PARTITION BY, GROUP BY and HAVING are not.
    • Removed initially by supporting arbitrary pull queries using KS topologies, (future work no. 4), and later by extending the feature set our state stores support, or by dropping in new store types, (future work no. 3).
  3. WHERE clause limited to ROwKEY and WINDOWSTART.
    • Removed initially by supporting arbitrary pull queries using KS topologies, (future work no. 4), and later by extending the feature set our state stores support, or by dropping in new store types, (future work no. 3).
  4. LIMIT is not supported.
    • Should be easy to extend the proposed design to implement LIMIT in state store based plans. This would not to be a store type feature.
@big-andy-coates big-andy-coates self-assigned this Oct 11, 2019
@big-andy-coates big-andy-coates added this to To do in Pull Queries via automation Oct 25, 2019
@big-andy-coates big-andy-coates removed their assignment Nov 20, 2019
@big-andy-coates big-andy-coates moved this from To do to Future Work? in Pull Queries Nov 20, 2019
@big-andy-coates big-andy-coates moved this from Future Work? to To do in Pull Queries Nov 20, 2019
@big-andy-coates big-andy-coates self-assigned this Nov 25, 2019
@agavra
Copy link
Contributor

agavra commented Dec 11, 2019

This is great! Thanks @big-andy-coates - I might be missing something fundamental, but does Kafka Streams support things other than Kafka Topics as sources in topologies? How would we be able to model a rocksDB state store (for example), or more accurately a set of rows from a rocksDB state store, as the source for a KS topology?

@big-andy-coates
Copy link
Contributor Author

I might be missing something fundamental, but does Kafka Streams support things other than Kafka Topics as sources in topologies? How would we be able to model a rocksDB state store (for example), or more accurately a set of rows from a rocksDB state store, as the source for a KS topology?

At the moment we create new state-stores each time a table is used as a source. Going forward, this is probably not going to work. It just doesn't scale! How we fix this is up for (much) debate. KS certainly doesn't support using a state store as a source at the moment, and it may never do. I'm not even sure it could, (as per comment in that section). But we'll need something as some point. It was more of a 'in the longer term...' style comment.

@vinothchandar
Copy link
Contributor

Thanks for the write up @big-andy-coates . Will spend more time understanding the code first :)

@guozhangwang
Copy link
Contributor

Great write-up @big-andy-coates !

At the moment we create new state-stores each time a table is used as a source. Going forward, this is probably not going to work. It just doesn't scale! How we fix this is up for (much) debate. KS certainly doesn't support using a state store as a source at the moment, and it may never do. I'm not even sure it could, (as per comment in that section). But we'll need something as some point. It was more of a 'in the longer term...' style comment.

I have also been thinking about this, following this design that pull / push queries would share the same query planners while perhaps use different runtimes where pull query to an issued IQ, heavy-weight push query to a KS app, light-weight push query to a KS-Light implementation. But since push query generate a streaming result (e.g. a change stream out of table), how would we achieve that from the state stores that would guarantee no missing data? Even if we just periodically issue IQ queries and then emit a new result when I found it has changed, we may still miss a few updates that gets applied very fast right?

@agavra
Copy link
Contributor

agavra commented Feb 3, 2021

This has been mostly complete as part of #6375 - closing this ticket though I'm sure there's lots of gold nuggets in here :)

@agavra agavra closed this as completed Feb 3, 2021
Pull Queries automation moved this from To do to Done Feb 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Pull Queries
  
Done
Development

No branches or pull requests

4 participants