-
-
Notifications
You must be signed in to change notification settings - Fork 480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
go/libraries/doltcore/remotestorage,go/store/datas/pull: Implement a pipelined chunk fetcher. #7824
Conversation
….readChunksAndCache.
…RPCDownloadLocsThread.
…unkFetch implementation.
…mentation of end-to-end pipelining remotestorage ChunkFetcher. TODO: - NewRpcErrors - Return empty CompressedChunks for things that are not found. - Fix how we find the next download. Currently might be slow.
…rent ways of structuring the reliable grpc call state machine.
…ize 8 buffer can actually store 8 elements.
…ular. Start reliable package with a reliable GRPC call copying some of the state machine logic.
… cleanup. More testing.
…vestigate other ways of structuring the GRPC reliable call state machine in order to compare them.
…t on a ranges tree for more efficiently picking next download.
…RepoToken storage back for StreamDownloadLocations pipeline.
…etecting missing chunks and adhering to the contract where they come back from Recv as a CompressedChunk with only a hash field set.
…fetch. Implement largest, smallest and random policy.
…ompressed to chunk_fetcher implementation.
…oke tests for tree.Len().
…m large (unhedged) responses back as chunks, instead of downloading everything before delivering it as chunks.
…aming, retries and throughput monitoring are in place.
… based on successes or failures of inflight requests.
…oncurrentParams into NetworkRequestParams.
…resh table file url rpc, grpc send, and grpc open for get download locations.
…e unused downThroughputCheck and iohelp import.
…ome comments regarding the implementation and usage.
… timeouts for getting HTTP response headers when download byte ranges.
…me comments for StreamingRangeDownload implementation.
…mentation comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, bit more concerned about the hand rolled primitives than the orchestration. Like realistically I'd need to do a bunch of testing to find whether the message queue has any edge cases where we'd drop a req or something. Same thing with the state machine, it all checks out afaict but it seems like there's a lot of potential failure cases where we don't restart a streaming connection or something. But it seems pretty unlikely that we would return successfully from a fetch without detecting missed chunks? So then worst-case is maybe a repo that has a fetch bug that we need to patch before it can be cloned, which is fine?
go/libraries/doltcore/remotestorage/internal/reliable/grpc_state_machine_struct.go
Outdated
Show resolved
Hide resolved
} | ||
for h := range hs { | ||
h := h | ||
addrs = append(addrs, h[:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason to decouple (1) receive addrs, (2) form request, (3) send request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like the thisResCh
might be unnecessary also if you combined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure I understand. You mean why do we batch them up separate from the incoming HashSet
?
The reason is a few fold, AFAIK:
- The remote server is going to impose limits on incoming request size, and it wants incoming request messages to be below a certain threshold.
- The remote server is going to respond to incoming requests serially, and requests are responded to in aggregate / fully – a response message comes back with all the resolved locations, and that response body has to be fully read and processed before any locations can be pulled out of it, etc.
- The remote server is essentially running
HasMany()
orgetReadReqs
or whatever on the table file indexes of the loaded Dolt repository. The method is more efficient with batches of addresses than for single shot lookups. - The response payload itself is more efficient for multi-lookups where the locations share the same table file. In that case, we only have to transit the signed URL for a given table file once. Whereas the response payloads are stateless across the connection, and so the smaller the batches, the more times the signed URLs are transitted.
The end result is that you want to batch up chunk addresses, but you can't / don't want your batches to be too big.
Does that answer the question?
In reality, some of these constraints are self-imposed and could be approached in a different way, but I think you would still want batching here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, I thought batching was working in the other direction for some reason -- accumulating hash sets, which the setup doesn't seem organized for. If we're sending out smaller batches, decoupling the receive/send makes sense
go/libraries/doltcore/remotestorage/internal/reliable/grpc_state_machine_struct.go
Outdated
Show resolved
Hide resolved
…te_machine_struct.go Co-authored-by: Maximilian Hoffman <max@dolthub.com>
Co-authored-by: Maximilian Hoffman <max@dolthub.com>
Co-authored-by: Maximilian Hoffman <max@dolthub.com>
…te_machine_struct.go Co-authored-by: Maximilian Hoffman <max@dolthub.com>
// `Get()` and `Recv()` concurrently. Unless there is an error, for every | ||
// single Hash passed to Get, a corresponding Recv() call will deliver the | ||
// contents of the chunk. When a caller is done with a ChunkFetcher, they | ||
// should call |CloseSend()|. After CloseSend, all requested hashes have been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// should call |CloseSend()|. After CloseSend, all requested hashes have been | |
// should call |CloseSend()|. After CloseSend, once all requested hashes have been |
tc.Close() | ||
assert.NoError(t, eg.Wait()) | ||
}) | ||
t.Run("SetTimeoutRespectsContext", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's being tested in "SetTimeoutRespectsContext"? There aren't any asserts in "BeforeRun". Is calling SetTimeout after cancel supposed to have specific behavior? Should this be running and waiting on the controller to test that it actually gets cancelled?
// Reads HashSets from reqCh and batches all the received addresses | ||
// into |GetDownloadLocsRequest| messages with up to |batchSize| chunk hashes | ||
// in them. It delivers the batched messages to |resCh|. | ||
func fetcherHashSetToGetDlLocsReqsThread(ctx context.Context, reqCh chan hash.HashSet, abortCh chan struct{}, resCh chan *remotesapi.GetDownloadLocsRequest, batchSize int, repoPath string, idFunc func() (*remotesapi.RepoId, string)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state machine in this function is pretty hard to follow. I'm not sure I understand it.
So thisResCh
is assigned to resCh
the first time there are any elements in addr
. Prior to that, it's nil.
Writing to a nil
channel blocks forever. Is the idea that the select statement will never choose the thisResCh <- thisRes
case until we assign thisResCh
a non-nil value? Are we depending on the properties of writing to a nil
channel for correctness here? Is this a good practice?
Maybe this is the idiomatic way to do this. But as someone less versed in go channels, the intended execution flow is very unclear.
No description provided.