Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Determine Content-Length Before Attempting Multi-chunk Upload #2074

Merged
merged 12 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions src/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ export class Upload extends Writable {

let expectedUploadSize: number | undefined = undefined;

// Set `expectedUploadSize` to `contentLength` if available
// Set `expectedUploadSize` to `contentLength - this.numBytesWritten`, if available
if (typeof this.contentLength === 'number') {
expectedUploadSize = this.contentLength - this.numBytesWritten;
}
Expand Down Expand Up @@ -718,13 +718,37 @@ export class Upload extends Writable {
};

// If using multiple chunk upload, set appropriate header
if (multiChunkMode && expectedUploadSize) {
// The '-1' is because the ending byte is inclusive in the request.
const endingByte = expectedUploadSize + this.numBytesWritten - 1;
headers['Content-Length'] = expectedUploadSize;
if (multiChunkMode) {
// We need to know how much data is available upstream to set the `Content-Range` header.
const oneChunkIterator = this.upstreamIterator(expectedUploadSize, true);
const {value} = await oneChunkIterator.next();

const bytesToUpload = value!.chunk.byteLength;

// Important: we want to know if the upstream has ended and the queue is empty before
// unshifting data back into the queue. This way we will know if this is the last request or not.
const isLastChunkOfUpload = !(await this.waitForNextChunk());

// Important: put the data back in the queue for the actual upload iterator
this.unshiftChunkBuffer(value!.chunk);

let totalObjectSize = this.contentLength;

if (typeof this.contentLength !== 'number' && isLastChunkOfUpload) {
// Let's let the server know this is the last chunk since
// we didn't know the content-length beforehand.
totalObjectSize = bytesToUpload + this.numBytesWritten;
}

// `- 1` as the ending byte is inclusive in the request.
const endingByte = bytesToUpload + this.numBytesWritten - 1;

// `Content-Length` for multiple chunk uploads is the size of the chunk,
// not the overall object
headers['Content-Length'] = bytesToUpload;
headers[
'Content-Range'
] = `bytes ${this.offset}-${endingByte}/${this.contentLength}`;
] = `bytes ${this.offset}-${endingByte}/${totalObjectSize}`;
danielbankhead marked this conversation as resolved.
Show resolved Hide resolved
} else {
headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;
}
Expand Down Expand Up @@ -798,11 +822,13 @@ export class Upload extends Writable {
// continue uploading next chunk
this.continueUploading();
} else if (!this.isSuccessfulResponse(resp.status)) {
const err: ApiError = {
code: resp.status,
name: 'Upload failed',
message: 'Upload failed',
};
const err: ApiError = new Error('Upload failed');
err.code = resp.status;
err.name = 'Upload failed';
if (resp?.data) {
err.errors = [resp?.data];
}

this.destroy(err);
} else {
// remove the last chunk sent to free memory
Expand Down
82 changes: 82 additions & 0 deletions system-test/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
Notification,
DeleteBucketCallback,
CRC32C,
UploadOptions,
} from '../src';
import * as nock from 'nock';
import {Transform} from 'stream';
Expand Down Expand Up @@ -2688,6 +2689,87 @@ describe('storage', () => {
});
});

describe('resumable upload', () => {
describe('multi-chunk upload', () => {
describe('upload configurations', () => {
const filePath: string = FILES.big.path;
const fileSize = fs.statSync(filePath).size;
let crc32c: string;

before(async () => {
// get a CRC32C value from the file
crc32c = await new Promise((resolve, reject) => {
const crc32c = new CRC32C();

fs.createReadStream(filePath)
.on('data', (d: Buffer) => crc32c.update(d))
.on('end', () => resolve(crc32c.toString()))
.on('error', reject);
});
});

async function uploadAndVerify(
file: File,
options: Omit<UploadOptions, 'destination'>
) {
await bucket.upload(filePath, {
destination: file,
...options,
});

const [metadata] = await file.getMetadata();

// assert we uploaded the expected data
assert.equal(metadata.crc32c, crc32c);
}

it('should support uploads where `contentLength < chunkSize`', async () => {
const file = bucket.file(generateName());

const metadata = {contentLength: fileSize};
// off by +1 to ensure `contentLength < chunkSize`
const chunkSize = fileSize + 1;

await uploadAndVerify(file, {chunkSize, metadata});
});

it('should support uploads where `contentLength % chunkSize != 0`', async () => {
const file = bucket.file(generateName());

const metadata = {contentLength: fileSize};
// off by -1 to ensure `contentLength % chunkSize != 0`
const chunkSize = fileSize - 1;

await uploadAndVerify(file, {chunkSize, metadata});
});

it('should support uploads where `fileSize % chunkSize != 0` && `!contentLength`', async () => {
const file = bucket.file(generateName());
// off by +1 to ensure `fileSize % chunkSize != 0`
const chunkSize = fileSize + 1;

await uploadAndVerify(file, {chunkSize});
});

it('should support uploads where `fileSize < chunkSize && `!contentLength`', async () => {
const file = bucket.file(generateName());
// off by `* 2 +1` to ensure `fileSize < chunkSize`
const chunkSize = fileSize * 2 + 1;

await uploadAndVerify(file, {chunkSize});
});

it('should support uploads where `fileSize > chunkSize` && `!contentLength`', async () => {
const file = bucket.file(generateName());
// off by -1 to ensure `fileSize > chunkSize`
const chunkSize = fileSize - 1;

await uploadAndVerify(file, {chunkSize});
});
});
});
});

describe('bucket upload with progress', () => {
it('show bytes sent with resumable upload', async () => {
const fileSize = fs.statSync(FILES.big.path).size;
Expand Down
20 changes: 15 additions & 5 deletions test/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,9 @@ describe('resumable-upload', () => {
});

describe('request preparation', () => {
// Simulating the amount of data written from upstream (exhaustive)
const UPSTREAM_BUFFER_SIZE = 512;
const UPSTREAM_ENDED = true;
// a convenient handle for getting the request options
let reqOpts: GaxiosOptions;

Expand Down Expand Up @@ -1074,8 +1077,8 @@ describe('resumable-upload', () => {

reqOpts = requestOptions;
};
up.upstreamChunkBuffer = Buffer.alloc(512);
up.upstreamEnded = true;
up.upstreamChunkBuffer = Buffer.alloc(UPSTREAM_BUFFER_SIZE);
up.upstreamEnded = UPSTREAM_ENDED;
});

describe('single chunk', () => {
Expand Down Expand Up @@ -1153,18 +1156,25 @@ describe('resumable-upload', () => {

it('should prepare a valid request if `contentLength` is unknown', async () => {
const OFFSET = 100;
const EXPECTED_STREAM_AMOUNT = Math.min(
UPSTREAM_BUFFER_SIZE - OFFSET,
CHUNK_SIZE
);
const ENDING_BYTE = EXPECTED_STREAM_AMOUNT + OFFSET - 1;

up.offset = OFFSET;
up.contentLength = '*';

await up.startUploading();

const endByte = OFFSET + CHUNK_SIZE - 1;
assert(reqOpts.headers);
assert.equal(reqOpts.headers['Content-Length'], CHUNK_SIZE);
assert.equal(
reqOpts.headers['Content-Length'],
EXPECTED_STREAM_AMOUNT
);
assert.equal(
reqOpts.headers['Content-Range'],
`bytes ${OFFSET}-${endByte}/*`
`bytes ${OFFSET}-${ENDING_BYTE}/*`
);
assert.ok(
X_GOOG_API_HEADER_REGEX.test(reqOpts.headers['x-goog-api-client'])
Expand Down