Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions generator/.DevConfigs/433a9a6d-b8ea-4676-b763-70711e8288e3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"services": [
{
"serviceName": "S3",
"type": "minor",
"changeLogMessages": [
"Added UploadInitiatedEvent, UploadCompletedEvent, and UploadFailedEvent for multipart uploads."
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,46 @@ private void UploadPartProgressEventCallback(object sender, UploadProgressArgs e
long transferredBytes = Interlocked.Add(ref _totalTransferredBytes, e.IncrementTransferred - e.CompensationForRetry);

var progressArgs = new UploadProgressArgs(e.IncrementTransferred, transferredBytes, this._contentLength,
e.CompensationForRetry, this._fileTransporterRequest.FilePath);
e.CompensationForRetry, this._fileTransporterRequest.FilePath, this._fileTransporterRequest);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are also adding the request object to the progress args as part of this change

this._fileTransporterRequest.OnRaiseProgressEvent(progressArgs);
}

private void FireTransferInitiatedEvent()
{
var initiatedArgs = new UploadInitiatedEventArgs(
request: _fileTransporterRequest,
totalBytes: _contentLength,
filePath: _fileTransporterRequest.FilePath
);

_fileTransporterRequest.OnRaiseTransferInitiatedEvent(initiatedArgs);
}

private void FireTransferCompletedEvent(TransferUtilityUploadResponse response)
{
var completedArgs = new UploadCompletedEventArgs(
request: _fileTransporterRequest,
filePath: _fileTransporterRequest.FilePath,
response: response,
transferredBytes: Interlocked.Read(ref _totalTransferredBytes),
totalBytes: _contentLength
);

_fileTransporterRequest.OnRaiseTransferCompletedEvent(completedArgs);
}

private void FireTransferFailedEvent()
{
var failedArgs = new UploadFailedEventArgs(
request: _fileTransporterRequest,
filePath: _fileTransporterRequest.FilePath,
transferredBytes: Interlocked.Read(ref _totalTransferredBytes),
totalBytes: _contentLength
);

_fileTransporterRequest.OnRaiseTransferFailedEvent(failedArgs);
}

/// <summary>
/// <para>
/// If a checksum algorithm was not specified, we MUST add the default value used by the SDK (as the individual part
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,33 @@ internal partial class MultipartUploadCommand : BaseCommand

public override async Task ExecuteAsync(CancellationToken cancellationToken)
{
// Fire transfer initiated event FIRST, before choosing path
FireTransferInitiatedEvent();

if ( (this._fileTransporterRequest.InputStream != null && !this._fileTransporterRequest.InputStream.CanSeek) || this._fileTransporterRequest.ContentLength == -1)
{
await UploadUnseekableStreamAsync(this._fileTransporterRequest, cancellationToken).ConfigureAwait(false);
}
else
{
var initRequest = ConstructInitiateMultipartUploadRequest();
var initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken)
InitiateMultipartUploadResponse initResponse = null;
try
{
var initRequest = ConstructInitiateMultipartUploadRequest();
initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId);
Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId);
}
catch (Exception)
{
FireTransferFailedEvent();
throw;
}

var pendingUploadPartTasks = new List<Task<UploadPartResponse>>();

SemaphoreSlim localThrottler = null;
CancellationTokenSource internalCts = null;

try
{
Logger.DebugFormat("Queue up the UploadPartRequests to be executed");
Expand Down Expand Up @@ -101,14 +113,19 @@ await localThrottler.WaitAsync(cancellationToken)

Logger.DebugFormat("Beginning completing multipart. ({0})", initResponse.UploadId);
var compRequest = ConstructCompleteMultipartUploadRequest(initResponse);
await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken)
var completeResponse = await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
Logger.DebugFormat("Done completing multipart. ({0})", initResponse.UploadId);

var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse);
FireTransferCompletedEvent(mappedResponse);
}
catch (Exception e)
{
Logger.Error(e, "Exception while uploading. ({0})", initResponse.UploadId);
Logger.Error(e, "Exception while uploading. ({0})", initResponse?.UploadId ?? "unknown");

FireTransferFailedEvent();

// Can't do async invocation in the catch block, doing cleanup synchronously.
Cleanup(initResponse.UploadId, pendingUploadPartTasks);
Copy link

Copilot AI Oct 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential null reference exception when initResponse is null. The code should check if initResponse is not null before accessing UploadId, or pass initResponse?.UploadId to handle the null case properly.

Suggested change
Cleanup(initResponse.UploadId, pendingUploadPartTasks);
Cleanup(initResponse?.UploadId, pendingUploadPartTasks);

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is needed because initresponse could only be null if the initial request failed and if it did we throw exception above?

throw;
Expand Down Expand Up @@ -207,8 +224,19 @@ private void AbortMultipartUpload(string uploadId)
}
};

var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler);
var initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false);
InitiateMultipartUploadResponse initiateResponse = null;

try
{
var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler);
initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
FireTransferFailedEvent();
Logger.Error(ex, "Failed to initiate multipart upload for unseekable stream");
throw;
}

try
{
Expand Down Expand Up @@ -276,12 +304,17 @@ private void AbortMultipartUpload(string uploadId)

this._uploadResponses = uploadPartResponses;
CompleteMultipartUploadRequest compRequest = ConstructCompleteMultipartUploadRequest(initiateResponse, true, requestEventHandler);
await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false);
var completeResponse = await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false);
Logger.DebugFormat("Completed multi part upload. (Part count: {0}, Upload Id: {1})", uploadPartResponses.Count, initiateResponse.UploadId);

var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse);
FireTransferCompletedEvent(mappedResponse);
}
}
catch (Exception ex)
{
FireTransferFailedEvent();

await _s3Client.AbortMultipartUploadAsync(new AbortMultipartUploadRequest()
{
BucketName = request.BucketName,
Expand Down
Loading