-
Notifications
You must be signed in to change notification settings - Fork 222
Add RFC on sharded changes #651
Conversation
database. If tasks need to acquire new snapshots along the way because of the | ||
large number of updates they need to process they can do so without | ||
coordination, but in the end the parent job MUST ensure that all tasks have | ||
updated to the same final snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we account for fairness here? A index of 1000 shards would appear to get to do more work than an index of 1 shard. I can't quite tell if we're free to commit to each shard independently. I think so, as long as we don't execute queries until they've all been "updated to the same final snapshot". If I'm reading that right, we would be free to vary the number of shard index jobs at runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, with the default hashing scheme each indexer task can commit to the same secondary index independently since they are operating on disjoint sets of documents. If, on the other hand, you had a hashing scheme where you just sprayed document updates randomly across shards you might need to add some extra bookkeeping to avoid cases where two tasks are concurrently operating on different versions of the same document (and maybe committing updates in the wrong order, i.e. overwriting index entries from a newer version with an older one).
A resharding event would need to act as a barrier here; the indexer job would need to allow all its tasks to finish on the old shard map and then start a new job with the new map.
I suppose if you wanted to run fewer indexing tasks than the number of shards you could certainly do that and have each task work on one or more shards in series without a ton of extra complexity. Running more tasks than the number of shards is a different proposal. Maybe possible, but wasn't my focus here. Others might be able to comment on why we haven't done that already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, that helps. I was thinking only of the scale down case, I also don't see how we could have more tasks than shards, that's the motivating case for re-introducing sharding.
- Linearly scalable _changes feed consumption | ||
|
||
Disadvantages | ||
- Introduction of a new per-database tunable parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an understatement. the move to fdb was hoped to remove all the notions of q
(and scatter-gather for that matter) that have proved so problematic. The proposal does contain enough details on how this new q
can vary over time, though.
# Key Changes | ||
|
||
Users would be able to modify the shard count of the changes feed up and down to | ||
have some control over resources devoted to background index maintenance. While |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in some environments it will not be appropriate to give users this level of control, fairness must be designed into the view indexer.
|
||
`GET /db/_changes/<ShardID>` | ||
|
||
will provide the change feed for a given shard using JSONL for all responses, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSONL
! it finally has a name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we don't even get a mention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT it still doesn't have an official mime-type though.
If it is going to be this format please don't (ab)use the application/json
type in the way the existing continuous feed does - tooling that automatically tries to handle responses based on mime-type break when they receive JSONL with the JSON mime-type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the main goal of this proposal is. I think its about improving view engine build performance, but there's also chatter on dev@ about improving replications/changes feed reads.
If we're focusing on attempting to improve view build performance through parallelization then I'm confused that we're starting with sharding as the first gambit. Unless I missed a section on how views will also be sharded, we're still left with the fact that writes to the view ebtree aren't currently parallelizable. If we want to make view updates faster, the first thing I'd investigate is how to upgrade ebtree to allow update-in-place semantics with the MVCC semantics that FDB offers.
If on the other hand the main goal is speeding up consuming the _changes feed I'd like to have some base level performance on what 3.x does, what main does, and what we'd want to it be. Streaming contiguous ranges of keys out of FDB should be blazing fast. Its possible that I'm forgetting a data dependent read for each row of the _changes feed but I'd look at optimizations around either doing those asynchronously or tweaking APIs to avoid the requirement.
In the end, my gut reaction to adding sharding on top of a system that already transparently handles sharding is maybe not the best idea without at least some cursory demonstration of the benefits as well as an understanding why the performance improvement exists.
Thanks for the read Paul. The Intro was meant to convey the goals of the proposal, in priority order:
That said, I totally spaced out on the
|
all future updates to that database will be routed to those new subspaces. | ||
Consumers of the shard-level API will receive a notification that a resharding | ||
event has occurred once they reach the end of the updates committed to the | ||
previous subspace. They MUST re-connect to the new endpoints once they receive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken in conjunction with the previous statement
all future updates to that database will be routed to those new subspaces
Does this mean that a consumer, wishing to process the entire changes feed (i.e from 0
) on a database that had been resharded, would need to iterate over each shard of each tombstoned space until reaching the notification and process all the current shards as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's correct
|
||
`GET /db/_changes/_meta?since=N` | ||
|
||
can be used to retrieve the shard topology as of a particular sequence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to go back to my earlier example if I was processing the feed since 0
on a database that had been resharded I might do:
GET /db/_changes/_meta?since=0
- For each shard ID:
GET /db/_changes/<ShardID>
- process changes
- reach a notification that there was a resharding event
Now how do I know what the new shard IDs are to continue processing the feed?
It seems like I should call GET /db/_changes/_meta?since=N
, but isn't the last seq I received still part of the original topology; so I don't have an N
that will tell me about the new topology?
Is the intention that the notification will tell me what the new N
is? If so how does that work if there haven't been any new changes to the database yet, does the resharding itself create a sequence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I realize I didn't fully specify that bit. The resharding does create its own sequence, and a call to the _meta
endpoint with that sequence would return the new topology. I suppose the JSONL feed would have a final line indicating the presence of the tombstone at that sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main issue I see with this proposal is that it feels to me like it makes multi-doc transactions difficult to implement. Taking as an assumption that if CouchDB on FoundationDB wanted to support replicating and indexing these transactions as a single transaction.
The reason for this is relatively clear, I hope: that a naive partitioning scheme (e.g., docID) would leave different parts of a single transaction across different changes feed partitions. Meaning that both replication and indexing, without some form of coordination which would most likely defeat the object of the exercise, would replicate and index the changes as separate transactions, leaving an index or target database in an inconsistent state.
Documents will be routed to shards using a configurable hashing scheme. The
default scheme will use consistent hashing on the partition key, so that a) all
updates to a given document will land in the same shard, and b) documents from
the same partition in a partitioned database will also be colocated.
(src)
I think that this offers a way out, because one could specify that multi-doc transactions were only supported within a given partition key, and we treat a partition key as an "atom" from the point of view of various things -- transaction boundary being one of the most important if we wish to extend the concept into other parallel features.
Update: I thought harder. Given we don't actually offer a guarantee of ordering within the current, single changes feed, IIRC, I don't think that partitioning the feed changes the fact that we currently can't offer multi-doc transactions that remain atomic across either replication or indexing. This would have to be built on top of a different seq-index-like data structure that I guess ends up looking a lot more like a transaction log. So I think my opposition is moot.
This seems to have been dropped, can someone confirm? I stand by my comment above fwiw, that adding sharding after all the effort to switch to FDB to get away from it is regrettable. I could only countenance this in couchdb 4.0 if there is no alternative and it is demonstrated to be essential. |
@rnewson I think that it's important to note that we are "only" partitioning the changes feed subspace -- not primary data nor indexes themselves. IIRC Indexes were the primary problem within the 3.0 scheme because of the ordered scatter-gather we needed to perform. I'm not convinced that the partitioning/sharding of the changes index space is something that is a bad thing, at least with regards to some of the performance problems that shards caused in 3.0. I'd be curious what @kocolosk thinks about whether partitioning the changes feed brings back the bad stuff from CouchDB 3.0? |
Agree with @rnewson . Even if we switch the index storage format to allow paralelizable updates, adding a static Q would be a step back it seems. One issue is at the user/API level. We'd bring back Q, which we didn't want to have to deal any more with FDB as a backend. And then in the code, we just removed sharding code in fabric, I am not too excited about bringing parts of it back, unless it's a last resort and nothing else works. We invent some auto-sharding of course, but that would be even more complexity. It seems we'd also want to separate a bit better changes feed improvements vs indexing improvements. Could we speed up indexing without a static Q sharding of change feed with all the API changes involved and hand-written resharding code (epochs) and hard values? I think we can, if we invent a new index structure that allow paralelizable updates. Like say an inverted json index for Mango Queries based on https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20171020_inverted_indexes.md. The idea I had was to use locality API to split the _changes feed into sub-sequences, and either start a separate couch_jobs job (or just processes under a single couch_job indexer) to fetch docs, process and write to the index in parallel. So, if the _changes sequence looks like |
It sounds like we've not exhausted other options other than re-adding sharding just yet. Mike, your point on multi-doc updates is a good one, and perhaps that needs discussing before we commit to this path. The essential problem with multi-doc transactions is replication. Specifically, it would be a poor (or, at least, surprising) implementation of multi-doc transactions if those updates are not applied atomically when replicated, though it would be by far the easiest way to do it. Any form of replication enhancement to support this seems to have a significant scalability penalty. Not everybody would use multi-doc transactions, and those that would wouldn't necessarily do it for all updates, but building something we know will work poorly if used 'too much' is not a good idea (and particularly this thing, multi-doc txn, which we've explicitly prohibited for scalability reasons this whole time). A future multi-doc txn option would complicate the work complicated in this RFC but doesn't seem to break it (though it diminishes its performance characteristics, all the way to zero in some cases). Setting multi-doc txn aside, what are the limits of the current (relatively simple) changes implementation? How fast can we read the changes feed today? Can indexing read the existing changes feed from multiple places in parallel at runtime and still gain a performance boost? |
Yeah, transactions in the changes feed are a veritable "can of worms" with or without sharding, so @mikerhodes I think you were correct to call that one out of scope for this discussion. The last time I thought hard about it I came to the same conclusion that you did, which is that we'd want to build a separate transaction log rather than trying to turn the changes feed into said log. @nickva I can see where the locality API would be a useful way to parallelize the initial index build while minimizing the load on FDB. Of course it doesn't help much with DBs that see a high write QPS in steady-state, since the core data model still concentrates all writes to the seq index in a single FDB shard. I'll freely admit that the write hot spot may not be the most urgent problem to solve; I think my rationale for the proposal was in part the idea of taking out a few birds with a single stone and providing a well-defined scale out story for other external I don't think I have the time at the moment to help implement this, and won't be offended if folks want to close it. We can always revisit later. |
@kocolosk Good point, the locality trick would be useful internally to say process the changes feed for the indexing but wouldn't help with write hotspots. The design the _changes feed external API is pretty neat and I think it may be worth going that way eventually but perhaps with an auto-sharding set up so that users don't have to think about Q at all. Found a description of how FDB backup system avoids hot write shards https://forums.foundationdb.org/t/keyspace-partitions-performance/168/2. Apparently it's based on writing to Regarding changes feed being a bottleneck for indexing, we did a quick and dirty test by reading 1M and 10M changes on a busy cluster (3 storage nodes) and we were able to get about 58-64k rows/sec with just an empty accumulator which counts rows.
For indexing at least, it seems that's not too bad. We'd want to probably find a way to parallelize doc fetches, and most of all concurrent index updates. |
I'm closing this ticket due the migration of the docs from apache/couchdb-documentation to apache/couchdb. Please create a new PR in apache/couchdb. |
Opening a PR for discussion on this RFC