Skip to content

Commit

Permalink
feat: Allow a new upload session to be initiated as a single method call
Browse files Browse the repository at this point in the history
This allows the content length to be (optionally) specified, which is then
propagated via X-Upload-Content-Length.
  • Loading branch information
jskeet committed Mar 7, 2024
1 parent 41065fe commit f0b643e
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 9 deletions.
Expand Up @@ -13,8 +13,10 @@
// limitations under the License.

using Google.Apis.Http;
using Google.Apis.Storage.v1.Data;
using Google.Apis.Upload;
using Google.Cloud.ClientTesting;
using Grpc.Core.Interceptors;
using System;
using System.Collections.Generic;
using System.IO;
Expand Down Expand Up @@ -368,6 +370,57 @@ public async Task UploadObjectAsync_InvalidHash_DeleteAndThrow_DeleteFails()
ValidateData(bucket, name, new MemoryStream(interceptor.UploadedBytes));
}

[Fact]
public async Task InitiateUploadSessionAsync_NegativeLength()
{
var client = _fixture.Client;
var name = IdGenerator.FromGuid();
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => client.InitiateUploadSessionAsync(_fixture.SingleVersionBucket, name, contentType: null, contentLength: -5));
}

[Fact]
public async Task InitiateUploadSessionAsync_ZeroLength()
{
var client = _fixture.Client;
var name = IdGenerator.FromGuid();
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => client.InitiateUploadSessionAsync(_fixture.SingleVersionBucket, name, contentType: null, contentLength: 0));
}

[Theory]
[InlineData(false)] // contentLength = null
[InlineData(true)] // contentLength = correct length
public async Task InitiateUploadSessionAsyncThenUpload_NullOrCorrectLength(bool specifyLength)
{
var client = _fixture.Client;
var bucket = _fixture.SingleVersionBucket;
var name = IdGenerator.FromGuid();
var stream = GenerateData(50);
long? contentLength = specifyLength ? stream.Length : null;
var uploadUri = await client.InitiateUploadSessionAsync(bucket, name, null, contentLength);
var upload = ResumableUpload.CreateFromUploadUri(uploadUri, stream);
var progress = await upload.UploadAsync();
Assert.Equal(UploadStatus.Completed, progress.Status);
ValidateData(bucket, name, stream);
}

[Theory]
[InlineData(1)] // We specify a larger length than we upload
[InlineData(-1)] // We specify a shorter length than we upload
public async Task InitiateUploadSessionAsyncThenUpload_IncorrectLength(int lengthDelta)
{
var client = _fixture.Client;
var bucket = _fixture.SingleVersionBucket;
var name = IdGenerator.FromGuid();
var stream = GenerateData(50);
long? contentLength = stream.Length + lengthDelta;
var uploadUri = await client.InitiateUploadSessionAsync(bucket, name, null, contentLength);
var upload = ResumableUpload.CreateFromUploadUri(uploadUri, stream);
var progress = await upload.UploadAsync();
Assert.Equal(UploadStatus.Failed, progress.Status);
}

private class BreakUploadInterceptor : IHttpExecuteInterceptor
{
internal byte[] UploadedBytes { get; set; }
Expand Down
Expand Up @@ -355,9 +355,7 @@ public async Task UploadObjectWithSessionUri()
// var acl = PredefinedAcl.PublicRead // public
var acl = PredefinedObjectAcl.AuthenticatedRead; // private
var options = new UploadObjectOptions { PredefinedAcl = acl };
// Create a temporary uploader so the upload session can be manually initiated without actually uploading.
var tempUploader = client.CreateObjectUploader(bucketName, destination, contentType, new MemoryStream(), options);
var uploadUri = await tempUploader.InitiateSessionAsync();
var uploadUri = await client.InitiateUploadSessionAsync(bucketName, destination, contentType, contentLength: null, options);

// Send uploadUri to (unauthenticated) client application, so it can perform the upload:
using (var stream = File.OpenRead(source))
Expand Down
Expand Up @@ -49,8 +49,8 @@ public abstract partial class StorageClient
/// <summary>
/// Creates an instance which is capable of starting a resumable upload for an object.
/// </summary>
/// <param name="destination">Object to create or update. Must not be null, and must have the name,
/// bucket and content type populated.</param>
/// <param name="destination">Object to create or update. Must not be null, and must have the name
/// and bucket populated.</param>
/// <param name="source">The stream to read the data from. Must not be null.</param>
/// <param name="options">Additional options for the upload. May be null, in which case appropriate
/// defaults will be used.</param>
Expand Down Expand Up @@ -116,8 +116,8 @@ public abstract partial class StorageClient
/// <summary>
/// Uploads the data for an object in storage synchronously, from a specified stream.
/// </summary>
/// <param name="destination">Object to create or update. Must not be null, and must have the name,
/// bucket and content type populated.</param>
/// <param name="destination">Object to create or update. Must not be null, and must have the name
/// and bucket populated.</param>
/// <param name="source">The stream to read the data from. Must not be null.</param>
/// <param name="options">Additional options for the upload. May be null, in which case appropriate
/// defaults will be used.</param>
Expand All @@ -135,8 +135,8 @@ public abstract partial class StorageClient
/// <summary>
/// Uploads the data for an object in storage asynchronously, from a specified stream.
/// </summary>
/// <param name="destination">Object to create or update. Must not be null, and must have the name,
/// bucket and content type populated.</param>
/// <param name="destination">Object to create or update. Must not be null, and must have the name
/// and bucket populated.</param>
/// <param name="source">The stream to read the data from. Must not be null.</param>
/// <param name="options">Additional options for the upload. May be null, in which case appropriate
/// defaults will be used.</param>
Expand All @@ -153,5 +153,51 @@ public abstract partial class StorageClient
{
throw new NotImplementedException();
}

/// <summary>
/// Initiates an upload session, optionally specifying the length of the content to be uploaded.
/// The resulting URI can be used with <see cref="ResumableUpload.CreateFromUploadUri"/>.
/// </summary>
/// <param name="destination">Object to create or update. Must not be null, and must have the name
/// and bucket populated.</param>
/// <param name="contentLength">The length of the content to upload later. This may be null, in which
/// case any length of content may be uploaded. If the value is non-null, it must be strictly positive
/// (not zero), and the content uploaded later must be exactly this length.</param>
/// <param name="options">Additional options for the upload. May be null, in which case appropriate
/// defaults will be used.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation, with a result returning the
/// <see cref="Uri"/> to use in order to upload the content.</returns>
public virtual Task<Uri> InitiateUploadSessionAsync(
Object destination,
long? contentLength,
UploadObjectOptions options = null,
CancellationToken cancellationToken = default) =>
throw new NotImplementedException();

/// <summary>
/// Initiates an upload session, optionally specifying the length of the content to be uploaded.
/// The resulting URI can be used with <see cref="ResumableUpload.CreateFromUploadUri"/>.
/// </summary>
/// <param name="bucket">The name of the bucket containing the object. Must not be null.</param>
/// <param name="objectName">The name of the object within the bucket. Must not be null.</param>
/// <param name="contentType">The content type of the object. This should be a MIME type
/// such as "text/html" or "application/octet-stream". May be null.</param>
/// <param name="contentLength">The length of the content to upload later. This may be null, in which
/// case any length of content may be uploaded. If the value is non-null, it must be strictly positive
/// (not zero), and the content uploaded later must be exactly this length.</param>
/// <param name="options">Additional options for the upload. May be null, in which case appropriate
/// defaults will be used.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation, with a result returning the
/// <see cref="Uri"/> to use in order to upload the content.</returns>
public virtual Task<Uri> InitiateUploadSessionAsync(
string bucket,
string objectName,
string contentType,
long? contentLength,
UploadObjectOptions options = null,
CancellationToken cancellationToken = default) =>
throw new NotImplementedException();
}
}
Expand Up @@ -103,6 +103,39 @@ public sealed partial class StorageClientImpl : StorageClient
IProgress<IUploadProgress> progress = null) =>
new UploadHelper(this, destination, source, options, progress).ExecuteAsync(cancellationToken);

/// <inheritdoc />
public override Task<Uri> InitiateUploadSessionAsync(
Object destination,
long? contentLength,
UploadObjectOptions options = null,
CancellationToken cancellationToken = default)
{
// We could potentially do a single validation, but the reasons for preventing negative and zero
// values are somewhat different.
GaxPreconditions.CheckNonNegative(contentLength, nameof(contentLength));
if (contentLength == 0)
{
throw new ArgumentOutOfRangeException("A content length of 0 cannot be enforced. Use a null content length for 'any length'.");
}
ValidateObject(destination, nameof(destination));
var upload = CreateObjectUploader(destination, new LengthOnlyStream(contentLength), options);
return upload.InitiateSessionAsync(cancellationToken);
}

/// <inheritdoc />
public override Task<Uri> InitiateUploadSessionAsync(
string bucket,
string objectName,
string contentType,
long? contentLength,
UploadObjectOptions options = null,
CancellationToken cancellationToken = default)
{
ValidateBucketName(bucket);
var obj = new Object { Bucket = bucket, Name = objectName, ContentType = contentType };
return InitiateUploadSessionAsync(obj, contentLength, options, cancellationToken);
}

/// <summary>
/// Helper class to provide common context between sync and async operations. Helps avoid quite so much duplicate code...
/// </summary>
Expand Down Expand Up @@ -192,5 +225,33 @@ internal async Task<Object> ExecuteAsync(CancellationToken cancellationToken)
return result;
}
}

private sealed class LengthOnlyStream : Stream
{
private readonly long? _length;
internal LengthOnlyStream(long? length) => _length = length;

public override long Length => _length ?? throw new NotSupportedException();
public override bool CanSeek => _length.HasValue;

public override bool CanRead => throw new NotImplementedException();
public override bool CanWrite => throw new NotImplementedException();

public override long Position
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}

public override void Flush() => throw new NotImplementedException();
public override int Read(byte[] buffer, int offset, int count) =>
throw new NotImplementedException();
public override long Seek(long offset, SeekOrigin origin) =>
throw new NotImplementedException();
public override void SetLength(long value) =>
throw new NotImplementedException();
public override void Write(byte[] buffer, int offset, int count) =>
throw new NotImplementedException();
}
}
}

0 comments on commit f0b643e

Please sign in to comment.