Skip to content

Commit

Permalink
fix(lib-storage): call AbortMultipartUpload when failing to CompleteM…
Browse files Browse the repository at this point in the history
…ultipartUpload (#6112)

* fix(lib-storage): call AbortMultipartUpload when failing to CompleteMultipartUpload

* fix(lib-storage): update parts count up front to detect breach correctly

* fix(lib-storage): throw on multiple call to .done() method
  • Loading branch information
kuhe committed May 21, 2024
1 parent b97d070 commit b5288e6
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 106 deletions.
19 changes: 15 additions & 4 deletions lib/lib-storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ try {
client: new S3({}) || new S3Client({}),
params: { Bucket, Key, Body },

// optional tags
tags: [
/*...*/
], // optional tags
queueSize: 4, // optional concurrency configuration
partSize: 1024 * 1024 * 5, // optional size of each part, in bytes, at least 5MB
leavePartsOnError: false, // optional manually handle dropped parts
],

// additional optional fields show default values below:

// (optional) concurrency configuration
queueSize: 4,

// (optional) size of each part, in bytes, at least 5MB
partSize: 1024 * 1024 * 5,

// (optional) when true, do not automatically call AbortMultipartUpload when
// a multipart upload fails to complete. You should then manually handle
// the leftover parts.
leavePartsOnError: false,
});

parallelUploads3.on("httpUploadProgress", (progress) => {
Expand Down
12 changes: 12 additions & 0 deletions lib/lib-storage/src/Upload.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,16 @@ describe(Upload.name, () => {
expect(error).toBeDefined();
}
});

it("should reject calling .done() more than once on an instance", async () => {
const upload = new Upload({
params,
client: new S3({}),
});

await upload.done();
expect(() => upload.done()).rejects.toEqual(
new Error("@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance.")
);
});
});
247 changes: 145 additions & 102 deletions lib/lib-storage/src/Upload.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
AbortMultipartUploadCommand,
CompletedPart,
CompleteMultipartUploadCommand,
CompleteMultipartUploadCommandOutput,
Expand Down Expand Up @@ -36,18 +37,18 @@ const MIN_PART_SIZE = 1024 * 1024 * 5;

export class Upload extends EventEmitter {
/**
* S3 multipart upload does not allow more than 10000 parts.
* S3 multipart upload does not allow more than 10,000 parts.
*/
private MAX_PARTS = 10000;
private MAX_PARTS = 10_000;

// Defaults.
private queueSize = 4;
private partSize = MIN_PART_SIZE;
private leavePartsOnError = false;
private tags: Tag[] = [];
private readonly queueSize: number = 4;
private readonly partSize = MIN_PART_SIZE;
private readonly leavePartsOnError: boolean = false;
private readonly tags: Tag[] = [];

private client: S3Client;
private params: PutObjectCommandInput;
private readonly client: S3Client;
private readonly params: PutObjectCommandInput;

// used for reporting progress.
private totalBytes?: number;
Expand All @@ -57,13 +58,19 @@ export class Upload extends EventEmitter {
private abortController: IAbortController;
private concurrentUploaders: Promise<void>[] = [];
private createMultiPartPromise?: Promise<CreateMultipartUploadCommandOutput>;
private abortMultipartUploadCommand: AbortMultipartUploadCommand | null = null;

private uploadedParts: CompletedPart[] = [];
private uploadId?: string;
uploadEvent?: string;
private uploadEnqueuedPartsCount = 0;
/**
* Last UploadId if the upload was done with MultipartUpload and not PutObject.
*/
public uploadId?: string;
public uploadEvent?: string;

private isMultiPart = true;
private singleUploadResult?: CompleteMultipartUploadCommandOutput;
private sent = false;

constructor(options: Options) {
super();
Expand Down Expand Up @@ -94,6 +101,12 @@ export class Upload extends EventEmitter {
}

public async done(): Promise<CompleteMultipartUploadCommandOutput> {
if (this.sent) {
throw new Error(
"@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance."
);
}
this.sent = true;
return await Promise.race([this.__doMultipartUpload(), this.__abortTimeout(this.abortController.signal)]);
}

Expand Down Expand Up @@ -184,104 +197,64 @@ export class Upload extends EventEmitter {
private async __createMultipartUpload(): Promise<CreateMultipartUploadCommandOutput> {
if (!this.createMultiPartPromise) {
const createCommandParams = { ...this.params, Body: undefined };
this.createMultiPartPromise = this.client.send(new CreateMultipartUploadCommand(createCommandParams));
this.createMultiPartPromise = this.client
.send(new CreateMultipartUploadCommand(createCommandParams))
.then((createMpuResponse) => {
// We use the parameter Bucket/Key rather than the information from
// createMultipartUpload response in case the Bucket is an access point arn.
this.abortMultipartUploadCommand = new AbortMultipartUploadCommand({
Bucket: this.params.Bucket,
Key: this.params.Key,
UploadId: createMpuResponse.UploadId,
});
return createMpuResponse;
});
}
return this.createMultiPartPromise;
}

private async __doConcurrentUpload(dataFeeder: AsyncGenerator<RawDataPart, void, undefined>): Promise<void> {
for await (const dataPart of dataFeeder) {
if (this.uploadedParts.length > this.MAX_PARTS) {
if (this.uploadEnqueuedPartsCount > this.MAX_PARTS) {
throw new Error(
`Exceeded ${this.MAX_PARTS} as part of the upload to ${this.params.Key} and ${this.params.Bucket}.`
`Exceeded ${this.MAX_PARTS} parts in multipart upload to Bucket: ${this.params.Bucket} Key: ${this.params.Key}.`
);
}

try {
if (this.abortController.signal.aborted) {
return;
}
if (this.abortController.signal.aborted) {
return;
}

// Use put instead of multi-part for one chunk uploads.
if (dataPart.partNumber === 1 && dataPart.lastPart) {
return await this.__uploadUsingPut(dataPart);
}
// Use put instead of multipart for one chunk uploads.
if (dataPart.partNumber === 1 && dataPart.lastPart) {
return await this.__uploadUsingPut(dataPart);
}

if (!this.uploadId) {
const { UploadId } = await this.__createMultipartUpload();
this.uploadId = UploadId;
if (this.abortController.signal.aborted) {
return;
}
if (!this.uploadId) {
const { UploadId } = await this.__createMultipartUpload();
this.uploadId = UploadId;
if (this.abortController.signal.aborted) {
return;
}
}

const partSize: number = byteLength(dataPart.data) || 0;

const requestHandler = this.client.config.requestHandler;
const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;

let lastSeenBytes = 0;
const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => {
const requestPartSize = Number(request.query["partNumber"]) || -1;

if (requestPartSize !== dataPart.partNumber) {
// ignored, because the emitted event is not for this part.
return;
}

if (event.total && partSize) {
this.bytesUploadedSoFar += event.loaded - lastSeenBytes;
lastSeenBytes = event.loaded;
}

this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
part: dataPart.partNumber,
Key: this.params.Key,
Bucket: this.params.Bucket,
});
};
const partSize: number = byteLength(dataPart.data) || 0;

if (eventEmitter !== null) {
// The requestHandler is the xhr-http-handler.
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}
const requestHandler = this.client.config.requestHandler;
const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;

const partResult = await this.client.send(
new UploadPartCommand({
...this.params,
UploadId: this.uploadId,
Body: dataPart.data,
PartNumber: dataPart.partNumber,
})
);
let lastSeenBytes = 0;
const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => {
const requestPartSize = Number(request.query["partNumber"]) || -1;

if (eventEmitter !== null) {
eventEmitter.off("xhr.upload.progress", uploadEventListener);
}

if (this.abortController.signal.aborted) {
if (requestPartSize !== dataPart.partNumber) {
// ignored, because the emitted event is not for this part.
return;
}

if (!partResult.ETag) {
throw new Error(
`Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
);
}

this.uploadedParts.push({
PartNumber: dataPart.partNumber,
ETag: partResult.ETag,
...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }),
...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }),
...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }),
...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }),
});

if (eventEmitter === null) {
this.bytesUploadedSoFar += partSize;
if (event.total && partSize) {
this.bytesUploadedSoFar += event.loaded - lastSeenBytes;
lastSeenBytes = event.loaded;
}

this.__notifyProgress({
Expand All @@ -291,33 +264,89 @@ export class Upload extends EventEmitter {
Key: this.params.Key,
Bucket: this.params.Bucket,
});
} catch (e) {
// Failed to create multi-part or put
if (!this.uploadId) {
throw e;
}
// on leavePartsOnError throw an error so users can deal with it themselves,
// otherwise swallow the error.
if (this.leavePartsOnError) {
throw e;
}
};

if (eventEmitter !== null) {
// The requestHandler is the xhr-http-handler.
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}

this.uploadEnqueuedPartsCount += 1;

const partResult = await this.client.send(
new UploadPartCommand({
...this.params,
UploadId: this.uploadId,
Body: dataPart.data,
PartNumber: dataPart.partNumber,
})
);

if (eventEmitter !== null) {
eventEmitter.off("xhr.upload.progress", uploadEventListener);
}

if (this.abortController.signal.aborted) {
return;
}

if (!partResult.ETag) {
throw new Error(
`Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
);
}

this.uploadedParts.push({
PartNumber: dataPart.partNumber,
ETag: partResult.ETag,
...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }),
...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }),
...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }),
...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }),
});

if (eventEmitter === null) {
this.bytesUploadedSoFar += partSize;
}

this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
part: dataPart.partNumber,
Key: this.params.Key,
Bucket: this.params.Bucket,
});
}
}

private async __doMultipartUpload(): Promise<CompleteMultipartUploadCommandOutput> {
// Set up data input chunks.
const dataFeeder = getChunk(this.params.Body, this.partSize);
const concurrentUploaderFailures: Error[] = [];

// Create and start concurrent uploads.
for (let index = 0; index < this.queueSize; index++) {
const currentUpload = this.__doConcurrentUpload(dataFeeder);
const currentUpload = this.__doConcurrentUpload(dataFeeder).catch((err) => {
concurrentUploaderFailures.push(err);
});
this.concurrentUploaders.push(currentUpload);
}

// Create and start concurrent uploads.
await Promise.all(this.concurrentUploaders);
if (concurrentUploaderFailures.length >= 1) {
await this.markUploadAsAborted();
/**
* Previously, each promise in concurrentUploaders could potentially throw
* and immediately return control to user code. However, we want to wait for
* all uploaders to finish before calling AbortMultipartUpload to avoid
* stranding uploaded parts.
*
* We throw only the first error to be consistent with prior behavior,
* but may consider combining the errors into a report in the future.
*/
throw concurrentUploaderFailures[0];
}

if (this.abortController.signal.aborted) {
await this.markUploadAsAborted();
throw Object.assign(new Error("Upload aborted."), { name: "AbortError" });
}

Expand All @@ -341,6 +370,8 @@ export class Upload extends EventEmitter {
result = this.singleUploadResult!;
}

this.abortMultipartUploadCommand = null;

// Add tags to the object after it's completed the upload.
if (this.tags.length) {
await this.client.send(
Expand All @@ -356,6 +387,18 @@ export class Upload extends EventEmitter {
return result;
}

/**
* Abort the last multipart upload in progress
* if we know the upload id, the user did not specify to leave the parts, and
* we have a prepared AbortMultipartUpload command.
*/
private async markUploadAsAborted(): Promise<void> {
if (this.uploadId && !this.leavePartsOnError && null !== this.abortMultipartUploadCommand) {
await this.client.send(this.abortMultipartUploadCommand);
this.abortMultipartUploadCommand = null;
}
}

private __notifyProgress(progress: Progress): void {
if (this.uploadEvent) {
this.emit(this.uploadEvent, progress);
Expand Down

0 comments on commit b5288e6

Please sign in to comment.