Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving to a streaming model for piping assets from the host to the server. #73013

Merged
merged 111 commits into from
Apr 15, 2024

Conversation

CyrusNajmabadi
Copy link
Member

@CyrusNajmabadi CyrusNajmabadi commented Apr 13, 2024

Before this PR the way things worked is that when the server requested assets from the host, the host would do a full search for all the assets locally. Once it found them all, it would then write them all out in bulk to a pipe-writer between the systems.

On the server side we had a problem though. Our deserialization routines are all synchronous, but that doesn't work well on pipe-readers (which asynchronously read). To deal with this, we would first deserialize the entire message in-memory on the server, then would synchronously deserialize all the messages in it.

The above approach has two seriously unfortunate parts. First is having to do the full search of all requested checksums on the host before writing out any data. Second is having the server have to read in the entire message into memory before processing it.

--

This PR changes addresses both of those. First, our asset-finding routine no longer looks for all checksums, populating a result-map as it goes along. Instead, as it finds an asset, it invokes a callback which writes that asset to the pipe-writer. This allows us to start sending out assets immediately as we find them, and continuously as we find more.

Second, we've changed how we write out assets. Previously, we would jsut have a stream wrappign the pipewriter, and would write the asset-alone to that stream. Now, we instead write out a header prior to writing out the data for the asset. The header effectively encodes the length of the asset that will follow. Thsi header is written (and flushed) prior to writing out the asset. This length serves a very important purpose. Now, on the reading side, we can read in the length, and we can now tell the pipe-reader itself to read at least that many bytes in before we then pass the reader-stream off to our synchronous-deserialization code. This is now entirely safe (and efficient) because there's no sync-over-async blocking that would otherwise have to happen.

--

Effectively, this PR makes it so that we have one locatoin that handles the pipe-writer and reader, and properly sends messages asynchronously between them. These components then smartly manage in-memory buffers for the data to be written into. Because these are in-memory buffers, we have no problem utilizing our existing asset serialization systems that are all based on synchronous streams.

--

PR1: https://devdiv.visualstudio.com/DefaultCollection/DevDiv/_build/results?buildId=9413045&view=results
PR2: https://devdiv.visualstudio.com/DefaultCollection/DevDiv/_build/results?buildId=9413046&view=results
PR3: https://devdiv.visualstudio.com/DefaultCollection/DevDiv/_build/results?buildId=9418156&view=results
PR4: https://devdiv.visualstudio.com/DefaultCollection/DevDiv/_build/results?buildId=9418161&view=results

@dotnet-issue-labeler dotnet-issue-labeler bot added Area-IDE untriaged Issues and PRs which have not yet been triaged by a lead labels Apr 13, 2024
@@ -58,7 +58,7 @@ public void AddAllTo(HashSet<Checksum> checksums)
AssetPath assetPath,
TextDocumentStates<TState> documentStates,
HashSet<Checksum> searchingChecksumsLeft,
Dictionary<Checksum, object> result,
Func<Checksum, object, CancellationToken, ValueTask> onAssetFoundAsync,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this allows finding to all be callback based, allowing us to write out the found assets to the oop<-->host stream as they are found.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also means we avoid a large dictoinary that has to be populated fully.

@@ -19,9 +19,8 @@ namespace Microsoft.CodeAnalysis.Remote.Testing;
internal sealed class SimpleAssetSource(ISerializerService serializerService, IReadOnlyDictionary<Checksum, object> map) : IAssetSource
{
public ValueTask GetAssetsAsync<T, TArg>(
Checksum solutionChecksum, AssetPath assetPath, ReadOnlyMemory<Checksum> checksums, ISerializerService deserializerService, Action<int, T, TArg> callback, TArg arg, CancellationToken cancellationToken)
Checksum solutionChecksum, AssetPath assetPath, ReadOnlyMemory<Checksum> checksums, ISerializerService deserializerService, Action<Checksum, T, TArg> callback, TArg arg, CancellationToken cancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we can find results in any order, we pass back the checksum we found when we find it, not the index in passed-in checksums buffer. This actually cleans things up in many layers.

assetPath,
checksums,
WriteAssetToPipeAsync,
cancellationToken).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's where we do the search for the assets corresponding to the passed in checksums. As we find them, we call to WriteAssetToPipeAsync to immediately stream them to the server.

WriteLengthToPipeWriter(tempStream.Length);

// Ensure we flush out the length so the reading side knows how much data to read.
await pipeWriterStream.FlushAsync(cancellationToken).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the nice thing about this approach is that we can be all async here where we need to be (when actually doing the IO). But the ObjecReader/ObjectWriter parts can all be synchronous on top of streams (without doing any blocking on any actual cross process IO/async).

{
using var stream = await pipeReader.AsPrebufferedStreamAsync(cancellationToken).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one of the big changes. We used to have to prebuffer the entire reader into a stream in memory, so that we could pass that to our ObjectReader without worrying about it blocking on async io. we no longer need that.

{
// First, read the length of the asset (and header) we'll be reading.
var readResult = await pipeReader.ReadAtLeastAsync(sizeof(int), cancellationToken).ConfigureAwait(false);
var length = ReadLength(readResult);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do a trivial initial read to read in the length.

// Now buffer in the rest of the data we need to read. Because we're reading as much data in as
// we'll need to consume, all further reading (for this single item) can handle synchronously
// without worrying about this blocking the reading thread on cross-process pipe io.
await pipeReader.ReadAtLeastAsync(length, cancellationToken).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then a nice async read to ensure that much data is pulled over to OOP from the client (but much less than the entire data stream liek before).

// without worrying about this blocking the reading thread on cross-process pipe io.
await pipeReader.ReadAtLeastAsync(length, cancellationToken).ConfigureAwait(false);

using var reader = ObjectReader.GetReader(pipeReaderStream, leaveOpen: true, cancellationToken);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we can just synchrnously read from that buffer which is now in memory in our process.

// in service hub, cancellation means simply closed stream
var result = serializerService.Deserialize(kind, reader, cancellationToken);
Contract.ThrowIfNull(result);
callback(checksum, (T)result, arg);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now notify our own client of hte checksum/asset we were just streamed.

/// is holding onto (including within <see cref="Flush"/>, <see cref="FlushAsync"/>, or <see cref="Dispose"/>).
/// Responsibility for that is solely in the hands of <see cref="WriteAssetsAsync"/>.
/// </remarks>
private class PipeWriterStream : Stream, IDisposableObservable
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this specizlied stream. we own all the async parts of working with the stream ourselves. and the synchronous parts go to a different in-memory stream where sync over async is not a concern.

Contract.ThrowIfTrue(checksum != foundChecksum);
asset = foundAsset;
return ValueTaskFactory.CompletedTask;
}, cancellationToken).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note; this is a test-accessor. No concern abotu captures and perf and whatnot here.

T missingAsset,
(AssetProvider assetProvider, Checksum[] missingChecksums, Action<Checksum, T, TArg>? callback, TArg? arg) tuple) =>
{
var missingChecksum = tuple.missingChecksums[index];

tuple.callback?.Invoke(missingChecksum, missingAsset, tuple.arg!);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thsi callback was already "checksum+asset" based. That gets simpler now since that's what the component we call passes us.

return this.length;
}
}
public override long Length => this.length;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no change. just matching idiomatic code style.

{
return this.position;
}
get => this.position;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no change. just matching idiomatic code style.


var result = chunks[CurrentChunkIndex][CurrentChunkOffset];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no change. just matching idiomatic code style.

{
EnsureCapacity(value);

if (value < length)
{
// truncate the stream
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

view with whitespace off.

@ToddGrun
Copy link
Contributor

using System.Collections.Immutable;

nit: not needed anymore


Refers to: src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs:6 in cb89ca9. [](commit_id = cb89ca9, deletion_comment = False)

@ToddGrun
Copy link
Contributor

using System.Collections.Immutable;

codeflow and github don't interact nicely sometimes; this is in reference to:

using System.Collections.Immutable;


In reply to: 2054810507


Refers to: src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs:6 in cb89ca9. [](commit_id = cb89ca9, deletion_comment = False)

Copy link
Contributor

@ToddGrun ToddGrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@CyrusNajmabadi CyrusNajmabadi merged commit fc38e7d into dotnet:main Apr 15, 2024
27 checks passed
@CyrusNajmabadi CyrusNajmabadi deleted the lengthPrefixedMessages branch April 15, 2024 21:47
@dotnet-policy-service dotnet-policy-service bot added this to the Next milestone Apr 15, 2024
@CyrusNajmabadi
Copy link
Member Author

@jasonmalinowski For review when you get back.

@dibarbet dibarbet modified the milestones: Next, 17.11 P1 Apr 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area-IDE untriaged Issues and PRs which have not yet been triaged by a lead
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants