Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.
Sign upRFC: distributed SQL #6067
Conversation
RaduBerinde
changed the title from
RFC: distributed SQL RFC
to
RFC: distributed SQL
Apr 14, 2016
petermattis
reviewed
Apr 15, 2016
docs/RFCS/distributed_sql.md Outdated
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
petermattis
Apr 15, 2016
Contributor
LGTM
This is a well thought out RFC. I like the consideration of the alternatives and I'm in agreement with the proposed approach. There is a lot of work to do, yet I see an incremental path forward from our current system.
Two items to keep in mind during implementation:
- Building good test infrastructure. The logic test framework has been incredibly useful during the development of the current SQL execution engine. Perhaps it could be enhanced to allow the specification of additional attributes of the system, or we could create a new test framework (dist_test?) that allows both easy specification of system attributes (number of nodes, where ranges are stored, etc) and examination of the resulting query plans.
- Visualization/tracing infrastructure.
EXPLAIN (TRACE), output of logical and physical query plans and execution timings and stats.
|
LGTM This is a well thought out RFC. I like the consideration of the alternatives and I'm in agreement with the proposed approach. There is a lot of work to do, yet I see an incremental path forward from our current system. Two items to keep in mind during implementation:
|
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
RaduBerinde
Apr 15, 2016
Member
Thank you Peter! Very good points.
Review status: 0 of 11 files reviewed at latest revision, 1 unresolved discussion, some commit checks failed.
docs/RFCS/distributed_sql.md, line 1179 [r1] (raw file):
Agreed.
Comments from Reviewable
|
Thank you Peter! Very good points. Review status: 0 of 11 files reviewed at latest revision, 1 unresolved discussion, some commit checks failed. docs/RFCS/distributed_sql.md, line 1179 [r1] (raw file): Comments from Reviewable |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
bdarnell
Apr 16, 2016
Member
LGTM
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.
docs/RFCS/distributed_sql.md, line 83 [r1] (raw file):
Ingress == gateway?
docs/RFCS/distributed_sql.md, line 148 [r1] (raw file):
See also Hive, which is a closer match than sawzall in that its input language is close to SQL and it is not as closely tied to the mapreduce execution model.
docs/RFCS/distributed_sql.md, line 305 [r1] (raw file):
What does the notation [Cid,]ValueSum[,Cid] mean? The code passes ValueSum straight through, so it looks to me like this function actually preserves ordering on ValueSum. But if the input is ordered by ValueSum, then the output is ordered by -SortVal (which is also a better match for the next node's input requirement).
That's a fairly sophisticated inference, to look inside the 1-x expression and decide how it transforms the input ordering. It also doesn't seem to matter since this plan will insert a sort node anyway. This looks like the only non-identity ordering characterization in the document. I like the concept, but I'd like to see it fleshed out with more examples. In practice does it reduce to a boolean (order-preserving or not)? Would we ever need to have multiple ordering characterizations for a node? Would it be better for a node to list the orderings that it invalidates instead of the ones that it preserves?
docs/RFCS/distributed_sql.md, line 362 [r1] (raw file):
This map could require a lot of memory; it might be worthwhile sometimes to introduce a sort phase to allow the more efficient aggregator.
The mapreduce concept of a "combiner" could also be helpful here: a combiner is similar to a reducer but is colocated with the mapper nodes and performs a "partial reduction". The most efficient way to compute sums in a mapreduce is to use a SumReducer as your combiner to shrink the map output, then the shuffle phase puts everything in order, and then the reduce phase is a second SumReducer to add up all the partial sums from the combiner.
docs/RFCS/distributed_sql.md, line 400 [r1] (raw file):
Can you give an example of an aggregator that would differ based on whether it needed to preserve ordering or not? Is this for things like the sum aggregator where it's a choice between an ordered and unordered map, or are there aggregators where even bigger differences are expected?
docs/RFCS/distributed_sql.md, line 450 [r1] (raw file):
s/agregator/aggregator/
docs/RFCS/distributed_sql.md, line 475 [r1] (raw file):
Github isn't rendering a line break between DISTINCT and AGGREGATOR, which doesn't look like what was intended.
docs/RFCS/distributed_sql.md, line 891 [r1] (raw file):
What if the gateway doesn't know the leader for some ranges? The only way we currently have to determine leadership is to send a request to a specific range. I think it would be better to start by making one TableReader per range, with the coalesced reader left as a future optimization.
docs/RFCS/distributed_sql.md, line 900 [r1] (raw file):
s/SchedulePlan/ScheduleFlows/ (or change the header in the other direction)
docs/RFCS/distributed_sql.md, line 922 [r1] (raw file):
Requiring both a ScheduleFlows and StreamMailbox RPC seems inefficient in the common case where the processing graph is simple and shallow. Could we have one RPC that combines the two, essentially including the same request data as ScheduleFlows but streaming data back directly like StreamMailbox?
This would also be a good use case for the HTTP/2 PUSH_PROMISE feature if that is supported by GRPC, but I don't think it is.
Why does ScheduleFlows always come from the gateway? What if the gateway only talked to the inputs for the final node and everything propagated down the graph with each node scheduling flows on its own inputs at the same time that it opens the data stream?
docs/RFCS/distributed_sql.md, line 979 [r1] (raw file):
In general, I think it's fine to skip error recovery for now. The one exception is NotLeaderError: leadership is established lazily and so when we set up the streams, we must be prepared to find out that the node we talked to is not the leader. This is mainly a problem for using ScheduleFlows to operate on all the ranges that a given node is the leader of.
Does a change of leadership on any range invalidate the entire query? That would have been a problem prior to #5845 because leadership was pretty unstable, although with that change leadership changes should be rare enough that we may be able to be intolerant of mid-query lease transfers.
Splits are also a concern: Splits do not currently impact any running query, but if we rely on range bounds not changing during a query then we may run into queries that get aborted by ranges splitting underneath them.
docs/RFCS/distributed_sql.md, line 1092 [r1] (raw file):
How feasible is it to make the decisions in this plan (e.g. to use two different strategies for the two JOIN nodes)? What kind of stats need to be collected and made available to the planner?
How does the planner decide to give orders-by-date its own scheduling units while it doesn't break between the two sub-units of count-and-sum?
docs/RFCS/distributed_sql.md, line 1097 [r1] (raw file):
What are the five? (there are six headings between here and the "alternatives" section)
docs/RFCS/distributed_sql.md, line 1100 [r1] (raw file):
What about joins? is that M3 or are there more things between updates and joins?
docs/RFCS/distributed_sql.md, line 1127 [r1] (raw file):
In mapreduce the number of mappers and reducers was often chosen manually. Should we consider allowing the query author to give us hints about the size of data at various stages?
docs/RFCS/distributed_sql.md, line 1135 [r1] (raw file):
Currently, we do everything on the gateway node. It might be interesting to compare the proposed maximally-distributed approach with the minimally-distributed one in which we try to get everything back on the single gateway node as quickly as possible after an initial distributed reader phase (and filtering).
docs/RFCS/distributed_sql.md, line 1171 [r1] (raw file):
Yeah. One more thing to think about for future work: a read-only transaction that has been in the queue long enough can simply leave the queue and run on a follower; it doesn't need the leader once enough time has elapsed (but maybe this only matters on tiny clusters; in the long run every node will be the leader of some ranges and keep busy that way).
docs/RFCS/distributed_sql.md, line 1213 [r1] (raw file):
Another big downside to this approach is that the KV layer is generally harder to change in a backwards-compatible way, at least with the current model where KV writes are processed downstream of raft.
On the other hand, by introducing new RPCs that are addressed by range ID and that care about range leadership, this proposal does add some new logic at the KV layer.
Comments from Reviewable
|
LGTM Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed. docs/RFCS/distributed_sql.md, line 83 [r1] (raw file): docs/RFCS/distributed_sql.md, line 148 [r1] (raw file): docs/RFCS/distributed_sql.md, line 305 [r1] (raw file): That's a fairly sophisticated inference, to look inside the docs/RFCS/distributed_sql.md, line 362 [r1] (raw file): The mapreduce concept of a "combiner" could also be helpful here: a combiner is similar to a reducer but is colocated with the mapper nodes and performs a "partial reduction". The most efficient way to compute sums in a mapreduce is to use a SumReducer as your combiner to shrink the map output, then the shuffle phase puts everything in order, and then the reduce phase is a second SumReducer to add up all the partial sums from the combiner. docs/RFCS/distributed_sql.md, line 400 [r1] (raw file): docs/RFCS/distributed_sql.md, line 450 [r1] (raw file): docs/RFCS/distributed_sql.md, line 475 [r1] (raw file): docs/RFCS/distributed_sql.md, line 891 [r1] (raw file): docs/RFCS/distributed_sql.md, line 900 [r1] (raw file): docs/RFCS/distributed_sql.md, line 922 [r1] (raw file): This would also be a good use case for the HTTP/2 Why does docs/RFCS/distributed_sql.md, line 979 [r1] (raw file): Does a change of leadership on any range invalidate the entire query? That would have been a problem prior to #5845 because leadership was pretty unstable, although with that change leadership changes should be rare enough that we may be able to be intolerant of mid-query lease transfers. Splits are also a concern: Splits do not currently impact any running query, but if we rely on range bounds not changing during a query then we may run into queries that get aborted by ranges splitting underneath them. docs/RFCS/distributed_sql.md, line 1092 [r1] (raw file): How does the planner decide to give orders-by-date its own scheduling units while it doesn't break between the two sub-units of count-and-sum? docs/RFCS/distributed_sql.md, line 1097 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1100 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1127 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1135 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1171 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1213 [r1] (raw file): On the other hand, by introducing new RPCs that are addressed by range ID and that care about range leadership, this proposal does add some new logic at the KV layer. Comments from Reviewable |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
petermattis
Apr 17, 2016
Contributor
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.
docs/RFCS/distributed_sql.md, line 891 [r1] (raw file):
I believe this intended to be best effort based on info in the RangeCache.
Comments from Reviewable
|
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed. docs/RFCS/distributed_sql.md, line 891 [r1] (raw file): Comments from Reviewable |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
RaduBerinde
Apr 18, 2016
Member
TFTR Ben!
Updated with comments from both reviews.
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks pending.
docs/RFCS/distributed_sql.md, line 305 [r1] (raw file):
Ah, I tried to use it as a shorthand but it's not clear or accurate. We would have the following "entries" in the ordering characterization function:
ValueSum -> SortVal or ValueSum
Cid,ValueSum -> Cid,ValueSum or Cid,-SortVal
ValueSum,Cid -> ValueSum,Cid or -SortVal,Cid
I am not sure how to express the idea that it's sorted by both ValueSum and -SortVal at the same time (without one having precedence over the other). @andreimatei we will need to be able to express this in an orderingInfo - perhaps each "precedence" of ordering has multiple columns instead of one.
Yes, we are assuming we are inferring a sort order from the 1-x expression. This isn't strongly necessary and we definitely won't try to implement it for complex expressions, but it might be helpful so we will try to support simple/common expressions.
I don't think it reduces to a boolean. The way I envision it being implemented is as an actual function (not a list of orderings it invalidates/preserves). Even in the current code we use a function to convert form a scanNode ordering to a selectNode ordering. The ordering information has some non-trivial aspects (e.g. columns that we know are restricted to a single value, and thus can appear anywhere in terms of precedence).
I don't think we would have multiple ordering characterization functions per aggregator (assuming any "configurable" parameters for that aggregator are set).
docs/RFCS/distributed_sql.md, line 362 [r1] (raw file):
Yes, when adding sort aggregators things like this can be taken into account. Unfortunately a sorter also requires a lot of memory (potentially more if there are many rows that we would "coalesce" in a single map entry). On the other hand sorting could be implemented more efficiently using temporary storage.
Right - we later show the summer implemented as two stages of processors - one on the local node that performs the partial reduction, and a final stage on the remote node. In most cases those two layers work in the same way internally (and might actually do the exact same thing as in this case for SUM).
docs/RFCS/distributed_sql.md, line 400 [r1] (raw file):
The example we had in mind was what you said (choosing between an ordered vs unordered map). I'm not convinced choosing an ordered map ever makes sense though (vs unordered + a subsequent sort).
One case though is inner joins - a stream join aggregator could preserve the ordering in one of its input streams. But it could also run in a mode where it accumulates data from both streams until one stream ends, and from that point it generates output as it goes for each element in the longer stream. This way it automatically detects the smaller stream without having to guess (but it can no longer guarantee any ordering preservation). One could argue whether this is a different "configuration" of the same aggregator or that it is an entirely different aggregator (I don't think the distinction matters much though).
docs/RFCS/distributed_sql.md, line 891 [r1] (raw file):
We need to know the leaders when doing the physical planning anyway (at the very least we need to know on what nodes to instantiate the readers). It can be best effort but it has to be right most of the time to get the desired performance. We will have to make improvements to RangeCache to have this available most of the time.
docs/RFCS/distributed_sql.md, line 922 [r1] (raw file):
@andreimatei we need to correct this section
I think the last iteration was similar to what you propose but in the reverse direction - the flows start at table readers, and then the consumers on remote nodes are set up on the fly. This is necessary for things like join-by-lookup where we might not know the correct set of remote nodes beforehand.
docs/RFCS/distributed_sql.md, line 979 [r1] (raw file):
We don't rely on the correct leadership and range boundaries for correctness. It's just a performance concern (KV ops which we thought would be local turn out to not be local). The aim is that most of the time, most of the KV ops should be local. At least in the beginning, we won't care if something changes in-between physical planning and actual running of the query.
docs/RFCS/distributed_sql.md, line 1092 [r1] (raw file):
One possibility we were thinking about is that JOIN-READER chooses between the strategies at runtime. It could start by doing the reads remotely but if it detects that it will actually need to do a lot of them it switches to the other strategy. We would need to support setting instances of the resulting flows "on-the-fly". This is touched upon in the join-by-lookup section.
docs/RFCS/distributed_sql.md, line 1097 [r1] (raw file):
I wish I could make it put numberings on the sections. The "joins" paragraph is a subsection of the processor infrastructure.
docs/RFCS/distributed_sql.md, line 1100 [r1] (raw file):
I would see GROUP BY aggregation as an intermediate milestone before joins. I guess I started with M1 and M2 because we would probably reassess the rest on the way.
docs/RFCS/distributed_sql.md, line 1127 [r1] (raw file):
I think we should, added.
docs/RFCS/distributed_sql.md, line 1135 [r1] (raw file):
Agreed, added.
docs/RFCS/distributed_sql.md, line 1171 [r1] (raw file):
If we simply run KV Get ops on a follower node, won't the KV layer try to figure out the leader and talk to it anyway? In our proposal, we still use the standard KV APIs
docs/RFCS/distributed_sql.md, line 1213 [r1] (raw file):
Actually we are not proposing adding new KV RPCs. We are proposing using the existing KV operations, and simply relying on the fact that - when ran on the leader node - they should automatically be "fast". Added a clarification to the "KV Integration" section.
Comments from Reviewable
|
TFTR Ben! Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks pending. docs/RFCS/distributed_sql.md, line 305 [r1] (raw file):
I am not sure how to express the idea that it's sorted by both Yes, we are assuming we are inferring a sort order from the I don't think it reduces to a boolean. The way I envision it being implemented is as an actual function (not a list of orderings it invalidates/preserves). Even in the current code we use a function to convert form a I don't think we would have multiple ordering characterization functions per aggregator (assuming any "configurable" parameters for that aggregator are set). docs/RFCS/distributed_sql.md, line 362 [r1] (raw file): Right - we later show the summer implemented as two stages of processors - one on the local node that performs the partial reduction, and a final stage on the remote node. In most cases those two layers work in the same way internally (and might actually do the exact same thing as in this case for docs/RFCS/distributed_sql.md, line 400 [r1] (raw file): One case though is inner joins - a stream join aggregator could preserve the ordering in one of its input streams. But it could also run in a mode where it accumulates data from both streams until one stream ends, and from that point it generates output as it goes for each element in the longer stream. This way it automatically detects the smaller stream without having to guess (but it can no longer guarantee any ordering preservation). One could argue whether this is a different "configuration" of the same aggregator or that it is an entirely different aggregator (I don't think the distinction matters much though). docs/RFCS/distributed_sql.md, line 891 [r1] (raw file): docs/RFCS/distributed_sql.md, line 922 [r1] (raw file): I think the last iteration was similar to what you propose but in the reverse direction - the flows start at table readers, and then the consumers on remote nodes are set up on the fly. This is necessary for things like join-by-lookup where we might not know the correct set of remote nodes beforehand. docs/RFCS/distributed_sql.md, line 979 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1092 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1097 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1100 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1127 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1135 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1171 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1213 [r1] (raw file): Comments from Reviewable |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
andreimatei
Apr 18, 2016
Member
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.
docs/RFCS/distributed_sql.md, line 922 [r1] (raw file):
The idea is that for the most part, parts of the plan are disseminated by the gateway to all the nodes involved, using ScheduleFlows. This RPC sets up everything, including the mailboxes. Different nodes will then start talking to each other, and will use StreamMailbox to actually establish the GRPC streams (but the mailbox is expected to exist already, and maybe even contain buffered data, when this RPC arrives). PUSH_PROMISE is not generally applicable, because the gateway is not generally an endpoint in most streams.
When the gateway is an endpoint, we can consider as an optimization to have ScheduleFlows also initiate a stream. I guess this can only apply to the first of the potentially many streams that need to be established between two nodes, since gRPC does not seem to support any multiplexing over one connection (?).
In cases where parts of the query can potentially touch many/all nodes in the cluster, the plan can also be setup "as we go" - an JoinReader aggregator can push a flow onto a remote node if it wants too. In this case, the optimization can also apply. I'll add some sentences about this.
Comments from Reviewable
|
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed. docs/RFCS/distributed_sql.md, line 922 [r1] (raw file): In cases where parts of the query can potentially touch many/all nodes in the cluster, the plan can also be setup "as we go" - an Comments from Reviewable |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
andreimatei
Apr 18, 2016
Member
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.
docs/RFCS/distributed_sql.md, line 1092 [r1] (raw file):
In this example, what we need to detect is that the orders-by-date index and the PK of the orders table are highly correlated, and so it makes sense to push computation to the (small number) of nodes containing PK data corresponding to an index span. I was imagining we'd detect this without statistics, just by monitoring the read batches that are accumulated by JoinReader. Once a batch goes above some size, we push a plan remotely.
Comments from Reviewable
|
Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed. docs/RFCS/distributed_sql.md, line 1092 [r1] (raw file): Comments from Reviewable |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
bdarnell
Apr 20, 2016
Member
Reviewed 1 of 1 files at r2.
Review status: 1 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.
docs/RFCS/distributed_sql.md, line 891 [r1] (raw file):
Note that RangeCache and leaderCache are two different things. The former can be populated efficiently by scanning the meta ranges, but updating the leader cache requires at least one RPC per range (typically one, but a lagging replica might not know the current leader).
docs/RFCS/distributed_sql.md, line 922 [r1] (raw file):
GRPC multiplexes everything over one connection, but each RPC only has one result stream.
docs/RFCS/distributed_sql.md, line 1171 [r1] (raw file):
Currently, KV ops will always go to the leader, but we could transparently introduce an optimization in which reads at sufficiently old timestamps will run on the leader instead of returning NotLeaderError.
Comments from Reviewable
|
Reviewed 1 of 1 files at r2. docs/RFCS/distributed_sql.md, line 891 [r1] (raw file): docs/RFCS/distributed_sql.md, line 922 [r1] (raw file): docs/RFCS/distributed_sql.md, line 1171 [r1] (raw file): Comments from Reviewable |
RaduBerinde commentedApr 14, 2016
https://github.com/RaduBerinde/cockroach/blob/dist-sql-rfc/docs/RFCS/distributed_sql.md
This change is