diff --git a/generator/.DevConfigs/433a9a6d-b8ea-4676-b763-70711e8288e3.json b/generator/.DevConfigs/433a9a6d-b8ea-4676-b763-70711e8288e3.json new file mode 100644 index 000000000000..1790a068cfae --- /dev/null +++ b/generator/.DevConfigs/433a9a6d-b8ea-4676-b763-70711e8288e3.json @@ -0,0 +1,11 @@ +{ + "services": [ + { + "serviceName": "S3", + "type": "minor", + "changeLogMessages": [ + "Added UploadInitiatedEvent, UploadCompletedEvent, and UploadFailedEvent for multipart uploads." + ] + } + ] +} diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartUploadCommand.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartUploadCommand.cs index 644500df5bff..61ba2db64940 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartUploadCommand.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartUploadCommand.cs @@ -375,10 +375,46 @@ private void UploadPartProgressEventCallback(object sender, UploadProgressArgs e long transferredBytes = Interlocked.Add(ref _totalTransferredBytes, e.IncrementTransferred - e.CompensationForRetry); var progressArgs = new UploadProgressArgs(e.IncrementTransferred, transferredBytes, this._contentLength, - e.CompensationForRetry, this._fileTransporterRequest.FilePath); + e.CompensationForRetry, this._fileTransporterRequest.FilePath, this._fileTransporterRequest); this._fileTransporterRequest.OnRaiseProgressEvent(progressArgs); } + private void FireTransferInitiatedEvent() + { + var initiatedArgs = new UploadInitiatedEventArgs( + request: _fileTransporterRequest, + totalBytes: _contentLength, + filePath: _fileTransporterRequest.FilePath + ); + + _fileTransporterRequest.OnRaiseTransferInitiatedEvent(initiatedArgs); + } + + private void FireTransferCompletedEvent(TransferUtilityUploadResponse response) + { + var completedArgs = new UploadCompletedEventArgs( + request: _fileTransporterRequest, + filePath: _fileTransporterRequest.FilePath, + response: response, + transferredBytes: Interlocked.Read(ref _totalTransferredBytes), + totalBytes: _contentLength + ); + + _fileTransporterRequest.OnRaiseTransferCompletedEvent(completedArgs); + } + + private void FireTransferFailedEvent() + { + var failedArgs = new UploadFailedEventArgs( + request: _fileTransporterRequest, + filePath: _fileTransporterRequest.FilePath, + transferredBytes: Interlocked.Read(ref _totalTransferredBytes), + totalBytes: _contentLength + ); + + _fileTransporterRequest.OnRaiseTransferFailedEvent(failedArgs); + } + /// /// /// If a checksum algorithm was not specified, we MUST add the default value used by the SDK (as the individual part diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartUploadCommand.async.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartUploadCommand.async.cs index 74e5f6c874a8..4966c9e012e7 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartUploadCommand.async.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartUploadCommand.async.cs @@ -33,21 +33,33 @@ internal partial class MultipartUploadCommand : BaseCommand public override async Task ExecuteAsync(CancellationToken cancellationToken) { + // Fire transfer initiated event FIRST, before choosing path + FireTransferInitiatedEvent(); + if ( (this._fileTransporterRequest.InputStream != null && !this._fileTransporterRequest.InputStream.CanSeek) || this._fileTransporterRequest.ContentLength == -1) { await UploadUnseekableStreamAsync(this._fileTransporterRequest, cancellationToken).ConfigureAwait(false); } else { - var initRequest = ConstructInitiateMultipartUploadRequest(); - var initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken) + InitiateMultipartUploadResponse initResponse = null; + try + { + var initRequest = ConstructInitiateMultipartUploadRequest(); + initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken) .ConfigureAwait(continueOnCapturedContext: false); - Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId); + Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId); + } + catch (Exception) + { + FireTransferFailedEvent(); + throw; + } var pendingUploadPartTasks = new List>(); - SemaphoreSlim localThrottler = null; CancellationTokenSource internalCts = null; + try { Logger.DebugFormat("Queue up the UploadPartRequests to be executed"); @@ -101,14 +113,19 @@ await localThrottler.WaitAsync(cancellationToken) Logger.DebugFormat("Beginning completing multipart. ({0})", initResponse.UploadId); var compRequest = ConstructCompleteMultipartUploadRequest(initResponse); - await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken) + var completeResponse = await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken) .ConfigureAwait(continueOnCapturedContext: false); Logger.DebugFormat("Done completing multipart. ({0})", initResponse.UploadId); + var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse); + FireTransferCompletedEvent(mappedResponse); } catch (Exception e) { - Logger.Error(e, "Exception while uploading. ({0})", initResponse.UploadId); + Logger.Error(e, "Exception while uploading. ({0})", initResponse?.UploadId ?? "unknown"); + + FireTransferFailedEvent(); + // Can't do async invocation in the catch block, doing cleanup synchronously. Cleanup(initResponse.UploadId, pendingUploadPartTasks); throw; @@ -207,8 +224,19 @@ private void AbortMultipartUpload(string uploadId) } }; - var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler); - var initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false); + InitiateMultipartUploadResponse initiateResponse = null; + + try + { + var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler); + initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + FireTransferFailedEvent(); + Logger.Error(ex, "Failed to initiate multipart upload for unseekable stream"); + throw; + } try { @@ -276,12 +304,17 @@ private void AbortMultipartUpload(string uploadId) this._uploadResponses = uploadPartResponses; CompleteMultipartUploadRequest compRequest = ConstructCompleteMultipartUploadRequest(initiateResponse, true, requestEventHandler); - await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false); + var completeResponse = await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false); Logger.DebugFormat("Completed multi part upload. (Part count: {0}, Upload Id: {1})", uploadPartResponses.Count, initiateResponse.UploadId); + + var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse); + FireTransferCompletedEvent(mappedResponse); } } catch (Exception ex) { + FireTransferFailedEvent(); + await _s3Client.AbortMultipartUploadAsync(new AbortMultipartUploadRequest() { BucketName = request.BucketName, diff --git a/sdk/test/Services/S3/IntegrationTests/TransferUtilityTests.cs b/sdk/test/Services/S3/IntegrationTests/TransferUtilityTests.cs index 427b863e95ed..417d16098644 100644 --- a/sdk/test/Services/S3/IntegrationTests/TransferUtilityTests.cs +++ b/sdk/test/Services/S3/IntegrationTests/TransferUtilityTests.cs @@ -770,6 +770,210 @@ public void MultipartUploadProgressTest() } } + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadInitiatedEventTest() + { + var fileName = UtilityMethods.GenerateName(@"MultipartUploadTest\InitiatedEvent"); + var eventValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.IsTrue(args.TotalBytes > 0); + Assert.AreEqual(20 * MEG_SIZE, args.TotalBytes); + Assert.AreEqual(args.FilePath, Path.Combine(BasePath, fileName)); + } + }; + // Use 20MB+ to trigger multipart upload + UploadWithLifecycleEvents(fileName, 20 * MEG_SIZE, eventValidator, null, null); + eventValidator.AssertEventFired(); + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadCompletedEventTest() + { + var fileName = UtilityMethods.GenerateName(@"MultipartUploadTest\CompletedEvent"); + var eventValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.IsNotNull(args.Response); + Assert.AreEqual(args.TransferredBytes, args.TotalBytes); + Assert.AreEqual(25 * MEG_SIZE, args.TotalBytes); + Assert.IsTrue(!string.IsNullOrEmpty(args.Response.ETag)); + Assert.AreEqual(args.FilePath, Path.Combine(BasePath, fileName)); + } + }; + // Use 25MB to trigger multipart upload + UploadWithLifecycleEvents(fileName, 25 * MEG_SIZE, null, eventValidator, null); + eventValidator.AssertEventFired(); + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadFailedEventTest() + { + var fileName = UtilityMethods.GenerateName(@"MultipartUploadTest\FailedEvent"); + var eventValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.IsTrue(args.TotalBytes > 0); + Assert.AreEqual(22 * MEG_SIZE, args.TotalBytes); + Assert.AreEqual(args.FilePath, Path.Combine(BasePath, fileName)); + // For failed uploads, transferred bytes should be less than or equal to total bytes + Assert.IsTrue(args.TransferredBytes <= args.TotalBytes); + } + }; + + // Use invalid bucket name to force failure with multipart upload size + var invalidBucketName = "invalid-bucket-name-" + Guid.NewGuid().ToString(); + + try + { + // Use 22MB to trigger multipart upload + UploadWithLifecycleEventsAndBucket(fileName, 22 * MEG_SIZE, invalidBucketName, null, null, eventValidator); + Assert.Fail("Expected an exception to be thrown for invalid bucket"); + } + catch (AmazonS3Exception) + { + // Expected exception - the failed event should have been fired + eventValidator.AssertEventFired(); + } + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadCompleteLifecycleTest() + { + var fileName = UtilityMethods.GenerateName(@"MultipartUploadTest\CompleteLifecycle"); + + var initiatedValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.AreEqual(30 * MEG_SIZE, args.TotalBytes); + Assert.AreEqual(args.FilePath, Path.Combine(BasePath, fileName)); + } + }; + + var completedValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.IsNotNull(args.Response); + Assert.AreEqual(args.TransferredBytes, args.TotalBytes); + Assert.AreEqual(30 * MEG_SIZE, args.TotalBytes); + Assert.AreEqual(args.FilePath, Path.Combine(BasePath, fileName)); + } + }; + + // Use 30MB to trigger multipart upload + UploadWithLifecycleEvents(fileName, 30 * MEG_SIZE, initiatedValidator, completedValidator, null); + + initiatedValidator.AssertEventFired(); + completedValidator.AssertEventFired(); + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadUnseekableStreamInitiatedEventTest() + { + var fileName = UtilityMethods.GenerateName(@"MultipartUploadTest\UnseekableStreamInitiatedEvent"); + var eventValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.AreEqual(-1, args.TotalBytes); // Unseekable streams have unknown length + } + }; + UploadUnseekableStreamWithLifecycleEvents(20 * MEG_SIZE, eventValidator, null, null); + eventValidator.AssertEventFired(); + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadUnseekableStreamCompletedEventTest() + { + var eventValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.IsNotNull(args.Response); + Assert.AreEqual(-1, args.TotalBytes); // Unseekable streams have unknown length + Assert.AreEqual(20 * MEG_SIZE, args.TransferredBytes); // since we know the actual length via testing it, we can check the transferredbytes size + } + }; + UploadUnseekableStreamWithLifecycleEvents(20 * MEG_SIZE, null, eventValidator, null); + eventValidator.AssertEventFired(); + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadUnseekableStreamFailedEventTest() + { + var eventValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.AreEqual(-1, args.TotalBytes); // Unseekable streams have unknown length + } + }; + + // Use invalid bucket name to force failure with multipart upload size + var invalidBucketName = "invalid-bucket-name-" + Guid.NewGuid().ToString(); + + try + { + UploadUnseekableStreamWithLifecycleEventsAndBucket(20 * MEG_SIZE, invalidBucketName, null, null, eventValidator); + Assert.Fail("Expected an exception to be thrown for invalid bucket"); + } + catch (AmazonS3Exception) + { + // Expected exception - the failed event should have been fired + eventValidator.AssertEventFired(); + } + } + + [TestMethod] + [TestCategory("S3")] + public void MultipartUploadUnseekableStreamCompleteLifecycleTest() + { + var initiatedValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.AreEqual(-1, args.TotalBytes); // Unseekable streams have unknown length + } + }; + + var completedValidator = new TransferLifecycleEventValidator + { + Validate = (args) => + { + Assert.IsNotNull(args.Request); + Assert.IsNotNull(args.Response); + Assert.AreEqual(-1, args.TotalBytes); // Unseekable streams have unknown length + Assert.AreEqual(18 * MEG_SIZE, args.TransferredBytes); // Should have transferred all bytes + } + }; + + UploadUnseekableStreamWithLifecycleEvents(18 * MEG_SIZE, initiatedValidator, completedValidator, null); + + initiatedValidator.AssertEventFired(); + completedValidator.AssertEventFired(); + } + [TestMethod] [TestCategory("S3")] public void MultipartGetNumberTest() @@ -1624,6 +1828,55 @@ void UploadWithLifecycleEventsAndBucket(string fileName, long size, string targe transferUtility.Upload(request); } + + void UploadUnseekableStreamWithLifecycleEvents(long size, + TransferLifecycleEventValidator initiatedValidator, + TransferLifecycleEventValidator completedValidator, + TransferLifecycleEventValidator failedValidator) + { + UploadUnseekableStreamWithLifecycleEventsAndBucket(size, bucketName, initiatedValidator, completedValidator, failedValidator); + } + + void UploadUnseekableStreamWithLifecycleEventsAndBucket(long size, string targetBucketName, + TransferLifecycleEventValidator initiatedValidator, + TransferLifecycleEventValidator completedValidator, + TransferLifecycleEventValidator failedValidator) + { + var fileName = UtilityMethods.GenerateName(@"UnseekableStreamUpload\File"); + var key = fileName; + var path = Path.Combine(BasePath, fileName); + UtilityMethods.GenerateFile(path, size); + + // Convert file to unseekable stream + var stream = GenerateUnseekableStreamFromFile(path); + + var config = new TransferUtilityConfig(); + var transferUtility = new TransferUtility(Client, config); + var request = new TransferUtilityUploadRequest + { + BucketName = targetBucketName, + InputStream = stream, + Key = key, + ContentType = octetStreamContentType + }; + + if (initiatedValidator != null) + { + request.UploadInitiatedEvent += initiatedValidator.OnEventFired; + } + + if (completedValidator != null) + { + request.UploadCompletedEvent += completedValidator.OnEventFired; + } + + if (failedValidator != null) + { + request.UploadFailedEvent += failedValidator.OnEventFired; + } + + transferUtility.Upload(request); + } private class UnseekableStream : MemoryStream { private readonly bool _setZeroLengthStream;