Skip to content

Commit

Permalink
feat: add starting/max rates to BulkWriterOptions (#1305)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Sep 25, 2020
1 parent 67b0aba commit 57dcf1c
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 20 deletions.
120 changes: 110 additions & 10 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import {
import {RateLimiter} from './rate-limiter';
import {DocumentReference} from './reference';
import {Timestamp} from './timestamp';
import {Deferred, getRetryCodes, wrapError} from './util';
import {Deferred, getRetryCodes, isObject, wrapError} from './util';
import {BatchWriteResult, WriteBatch, WriteResult} from './write-batch';
import {logger} from './logger';
import {
invalidArgumentMessage,
validateInteger,
validateOptional,
} from './validate';

/*!
* The maximum number of writes that can be in a single batch.
Expand All @@ -42,7 +47,7 @@ const MAX_BATCH_SIZE = 20;
*
* https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic.
*/
const STARTING_MAXIMUM_OPS_PER_SECOND = 500;
export const DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500;

/*!
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
Expand Down Expand Up @@ -351,22 +356,46 @@ export class BulkWriter {

constructor(
private readonly firestore: Firestore,
enableThrottling: boolean
options?: firestore.BulkWriterOptions
) {
this.firestore._incrementBulkWritersCount();
validateBulkWriterOptions(options);

if (enableThrottling) {
this.rateLimiter = new RateLimiter(
STARTING_MAXIMUM_OPS_PER_SECOND,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS
);
} else {
if (options?.throttling === false) {
this.rateLimiter = new RateLimiter(
Number.POSITIVE_INFINITY,
Number.POSITIVE_INFINITY,
Number.POSITIVE_INFINITY,
Number.POSITIVE_INFINITY
);
} else {
let startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND;
let maxRate = Number.POSITIVE_INFINITY;

if (typeof options?.throttling !== 'boolean') {
if (options?.throttling?.maxOpsPerSecond !== undefined) {
maxRate = options.throttling.maxOpsPerSecond;
}

if (options?.throttling?.initialOpsPerSecond !== undefined) {
startingRate = options.throttling.initialOpsPerSecond;
}

// The initial validation step ensures that the maxOpsPerSecond is
// greater than initialOpsPerSecond. If this inequality is true, that
// means initialOpsPerSecond was not set and maxOpsPerSecond is less
// than the default starting rate.
if (maxRate < startingRate) {
startingRate = maxRate;
}
}

this.rateLimiter = new RateLimiter(
startingRate,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS,
maxRate
);
}
}

Expand Down Expand Up @@ -727,4 +756,75 @@ export class BulkWriter {
_setMaxBatchSize(size: number): void {
this.maxBatchSize = size;
}

/**
* Returns the rate limiter for testing.
*
* @private
*/
// Visible for testing.
_getRateLimiter(): RateLimiter {
return this.rateLimiter;
}
}

/**
* Validates the use of 'value' as BulkWriterOptions.
*
* @private
* @param value The BulkWriterOptions object to validate.
* @throws if the input is not a valid BulkWriterOptions object.
*/
function validateBulkWriterOptions(value: unknown): void {
if (validateOptional(value, {optional: true})) {
return;
}
const argName = 'options';

if (!isObject(value)) {
throw new Error(
`${invalidArgumentMessage(
argName,
'bulkWriter() options argument'
)} Input is not an object.`
);
}

const options = value as firestore.BulkWriterOptions;

if (
options.throttling === undefined ||
typeof options.throttling === 'boolean'
) {
return;
}

if (options.throttling.initialOpsPerSecond !== undefined) {
validateInteger(
'initialOpsPerSecond',
options.throttling.initialOpsPerSecond,
{
minValue: 1,
}
);
}

if (options.throttling.maxOpsPerSecond !== undefined) {
validateInteger('maxOpsPerSecond', options.throttling.maxOpsPerSecond, {
minValue: 1,
});

if (
options.throttling.initialOpsPerSecond !== undefined &&
options.throttling.initialOpsPerSecond >
options.throttling.maxOpsPerSecond
) {
throw new Error(
`${invalidArgumentMessage(
argName,
'bulkWriter() options argument'
)} "maxOpsPerSecond" cannot be less than "initialOpsPerSecond".`
);
}
}
}
2 changes: 1 addition & 1 deletion dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ export class Firestore implements firestore.Firestore {
* });
*/
bulkWriter(options?: firestore.BulkWriterOptions): BulkWriter {
return new BulkWriter(this, !options?.disableThrottling);
return new BulkWriter(this, options);
}

/**
Expand Down
16 changes: 11 additions & 5 deletions dev/src/rate-limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ export class RateLimiter {
* @param multiplier Rate by which to increase the capacity.
* @param multiplierMillis How often the capacity should increase in
* milliseconds.
* @param maximumCapacity Maximum number of allowed operations per second.
* The number of tokens added per second will never exceed this number.
* @param startTimeMillis The starting time in epoch milliseconds that the
* rate limit is based on. Used for testing the limiter.
*/
constructor(
private readonly initialCapacity: number,
private readonly multiplier: number,
private readonly multiplierMillis: number,
readonly maximumCapacity: number,
private readonly startTimeMillis = Date.now()
) {
this.availableTokens = initialCapacity;
Expand Down Expand Up @@ -147,11 +150,14 @@ export class RateLimiter {
'startTime cannot be after currentTime'
);
const millisElapsed = requestTimeMillis - this.startTimeMillis;
const operationsPerSecond = Math.floor(
Math.pow(
this.multiplier,
Math.floor(millisElapsed / this.multiplierMillis)
) * this.initialCapacity
const operationsPerSecond = Math.min(
Math.floor(
Math.pow(
this.multiplier,
Math.floor(millisElapsed / this.multiplierMillis)
) * this.initialCapacity
),
this.maximumCapacity
);

if (operationsPerSecond !== this.previousCapacity) {
Expand Down
107 changes: 107 additions & 0 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
create,
createInstance,
document,
InvalidApiUsage,
remove,
response,
set,
Expand All @@ -41,6 +42,7 @@ import {
} from './util/helpers';

import api = proto.google.firestore.v1;
import {DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND} from '../src/bulk-writer';

// Change the argument to 'console.log' to enable debug output.
setLogFunction(null);
Expand Down Expand Up @@ -178,6 +180,111 @@ describe('BulkWriter', () => {
setTimeoutHandler(setTimeout);
});

describe('options', () => {
it('requires object', async () => {
const firestore = await createInstance();
expect(() => firestore.bulkWriter(42 as InvalidApiUsage)).to.throw(
'Value for argument "options" is not a valid bulkWriter() options argument. Input is not an object.'
);
});

it('initialOpsPerSecond requires positive integer', async () => {
const firestore = await createInstance();
expect(() =>
firestore.bulkWriter({throttling: {initialOpsPerSecond: -1}})
).to.throw(
'Value for argument "initialOpsPerSecond" must be within [1, Infinity] inclusive, but was: -1'
);

expect(() =>
firestore.bulkWriter({throttling: {initialOpsPerSecond: 500.5}})
).to.throw(
'Value for argument "initialOpsPerSecond" is not a valid integer.'
);
});

it('maxOpsPerSecond requires positive integer', async () => {
const firestore = await createInstance();
expect(() =>
firestore.bulkWriter({throttling: {maxOpsPerSecond: -1}})
).to.throw(
'Value for argument "maxOpsPerSecond" must be within [1, Infinity] inclusive, but was: -1'
);

expect(() =>
firestore.bulkWriter({throttling: {maxOpsPerSecond: 500.5}})
).to.throw(
'Value for argument "maxOpsPerSecond" is not a valid integer.'
);
});

it('maxOpsPerSecond must be greater than initial ops per second', async () => {
const firestore = await createInstance();

expect(() =>
firestore.bulkWriter({
throttling: {initialOpsPerSecond: 550, maxOpsPerSecond: 500},
})
).to.throw(
'Value for argument "options" is not a valid bulkWriter() options argument. "maxOpsPerSecond" cannot be less than "initialOpsPerSecond".'
);
});

it('initial and max rates are properly set', async () => {
const firestore = await createInstance();

let bulkWriter = firestore.bulkWriter({
throttling: {initialOpsPerSecond: 500, maxOpsPerSecond: 550},
});
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(500);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(550);

bulkWriter = firestore.bulkWriter({
throttling: {maxOpsPerSecond: 1000},
});
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(500);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(1000);

bulkWriter = firestore.bulkWriter({
throttling: {initialOpsPerSecond: 100},
});
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(100);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(
Number.POSITIVE_INFINITY
);

bulkWriter = firestore.bulkWriter({
throttling: {maxOpsPerSecond: 100},
});
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(100);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(100);

bulkWriter = firestore.bulkWriter();
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(
DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND
);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(
Number.POSITIVE_INFINITY
);

bulkWriter = firestore.bulkWriter({throttling: true});
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(
DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND
);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(
Number.POSITIVE_INFINITY
);

bulkWriter = firestore.bulkWriter({throttling: false});
expect(bulkWriter._getRateLimiter().availableTokens).to.equal(
Number.POSITIVE_INFINITY
);
expect(bulkWriter._getRateLimiter().maximumCapacity).to.equal(
Number.POSITIVE_INFINITY
);
});
});

it('has a set() method', async () => {
const bulkWriter = await instantiateInstance([
{
Expand Down
6 changes: 6 additions & 0 deletions dev/test/rate-limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe('RateLimiter', () => {
/* initialCapacity= */ 500,
/* multiplier= */ 1.5,
/* multiplierMillis= */ 5 * 60 * 1000,
/* maximumCapacity= */ 1000000,
/* startTime= */ new Date(0).getTime()
);
});
Expand Down Expand Up @@ -106,5 +107,10 @@ describe('RateLimiter', () => {
expect(
limiter.calculateCapacity(new Date(90 * 60 * 1000).getTime())
).to.equal(738945);

// Check that maximum rate limit is enforced.
expect(
limiter.calculateCapacity(new Date(1000 * 60 * 1000).getTime())
).to.equal(1000000);
});
});
26 changes: 22 additions & 4 deletions types/firestore.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ declare namespace FirebaseFirestore {
* Creates a [BulkWriter]{@link BulkWriter}, used for performing
* multiple writes in parallel. Gradually ramps up writes as specified
* by the 500/50/5 rule.
*
* @param options An options object used to configure the throttling
* behavior for the underlying BulkWriter.
*/
bulkWriter(options?: BulkWriterOptions): BulkWriter;
}
Expand Down Expand Up @@ -613,12 +616,27 @@ declare namespace FirebaseFirestore {
}

/**
* An options object that can be used to disable request throttling in
* BulkWriter.
* An options object to configure throttling on BulkWriter.
*/
export interface BulkWriterOptions {
/** Whether to disable throttling. */
readonly disableThrottling?: boolean;
/**
* Whether to disable or configure throttling. By default, throttling is
* enabled. This field can be set to either a boolean or a config
* object. Setting it to `true` will use default values. You can override
* the defaults by setting it to `false` to disable throttling, or by
* setting the config values to enable throttling with the provided values.
*
* @param initialOpsPerSecond The initial maximum number of operations per
* second allowed by the throttler. If this field is not set, the default
* is 500 operations per second.
* @param maxOpsPerSecond The maximum number of operations per second
* allowed by the throttler. If this field is set, the throttler's allowed
* operations per second does not ramp up past the specified operations per
* second.
*/
readonly throttling?:
| boolean
| {initialOpsPerSecond?: number; maxOpsPerSecond?: number};
}

/**
Expand Down

0 comments on commit 57dcf1c

Please sign in to comment.