Skip to content

Commit

Permalink
feat(compartment-mapper): Read more carefully
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal committed Jul 26, 2023
1 parent ac9bb3b commit edf058a
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 9 deletions.
7 changes: 7 additions & 0 deletions packages/compartment-mapper/NEWS.md
Expand Up @@ -5,6 +5,13 @@ User-visible changes to the compartment mapper:
- Introduces support for source map generation.
Look for `computeSourceMapLocation` and `sourceMapHook` in
[`README.md`](README.md).
- This version reads more carefully, adding a control loop around concurrent
reads and classifying read errors caused by interrupts, resource exhaustion,
non-existant files, and existence of non-files.
To preserve backward compatibility and object capability principles, the
governor uses a low resolution timer by default and can be endowed with
`performance.now` for a better signal.
These changes will put file descriptor exhaustion firmly behind us.

# 0.8.5 (2023-07-17)

Expand Down
157 changes: 157 additions & 0 deletions packages/compartment-mapper/src/governor.js
@@ -0,0 +1,157 @@
/// <refs types="ses"/>

// This module provides a control-loop for governing the concurrency of reads
// as a mitigation for exhausting file descriptors, retrying over interrupts,
// and maximizing throughput regardless of the underlying transport.
//
// It does not escape the notice of the author that in practice concurrent
// reads from a file system do not tend to increase throughput.

// At time of writing, the compartment mapper can be used with or without
// lockdown, so these superficial duplicates of makeQueue and makePromiseKit
// are necessary only because harden is not available without lockdown.
// We may revisit this design if it becomes possible to use the generalized
// versions of these utilities before application of lockdown as discussed:
// https://github.com/endojs/endo/issues/1686

const { Fail, quote: q } = assert;

const makePromiseKit = () => {
let resolve;
let reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
};

export const makeQueue = () => {
let { promise: tailPromise, resolve: tailResolve } = makePromiseKit();
return {
put(value) {
const { resolve, promise } = makePromiseKit();
tailResolve({ value, promise });
tailResolve = resolve;
},
get() {
const promise = tailPromise.then(next => next.value);
tailPromise = tailPromise.then(next => next.promise);
return promise;
},
};
};

/**
* @param {object} args
* @param {number} [args.initialConcurrency]
* @param {number} [args.minimumConcurrency]
* @param {number} [args.maximumConcurrency]
* @param {() => number} args.now - a clock of arbitrary precision and
* magnitude.
* @param {(error: {message: string}) => boolean} args.isExhaustedError
* @param {(error: {message: string}) => boolean} args.isInterruptedError
*/
export const makeGovernor = ({
minimumConcurrency = 1,
initialConcurrency = minimumConcurrency,
maximumConcurrency = Infinity,
now,
isExhaustedError,
isInterruptedError,
}) => {
minimumConcurrency >= 1 || Fail`Minimum concurrency limit must be at least 1`;
initialConcurrency >= minimumConcurrency ||
Fail`Initial concurrency limit must be at least ${q(minimumConcurrency)}`;
initialConcurrency <= maximumConcurrency ||
Fail`Initial concurrency limit must be at most ${q(maximumConcurrency)}`;
const queue = makeQueue();
let limit = initialConcurrency;
let concurrency = initialConcurrency;
for (let i = 0; i < initialConcurrency; i += 1) {
queue.put();
}

let prev = 0;

const wrapRead = read => {
// We cannot govern throughput without a timer of some resolution.
// Inside a SES compartment the fallback of Date.now produces NaN.
if (Number.isNaN(now())) {
return read;
}

/**
* @param {string} location
*/
const wrappedRead = async location => {
await queue.get();
const start = now();

try {
const result = await read(location);

// Adjust concurrency limit in proportion to the change in
// throughput.
// A reduction in throughput indicates saturation over the bus from the
// underlying storage and suggests that we should reduce concurrent
// reads.
// An increase in throughput suggests an opportunity to exploit further
// concurrency.
const end = now();
// Make no adjustment if the resolution of the timer is not sufficient
// to measure any duration between the beginning and end of the read.
if (prev > 0 && end !== start) {
const next = result.byteLength / (end - start);
const change = next / prev;
if (change > 1) {
// Until we have saturated the bus, we cannot expect throughput to
// increase except due to noise.
// So, to exaggerate that noise, we increment the concurrency
// limit, which causes this algorithm to degenerate to the AIMD
// behavior similar to TCP slow start.
limit = Math.min(maximumConcurrency, limit + 1);
} else if (change < 1) {
// With decreasing throughput, at least allow one concurrent read, or
// we will never recover.
limit = Math.max(minimumConcurrency, limit * change);
}
// console.log('concurrency', concurrency, 'limit', limit);
prev = next;
}

return result;
} catch (error) {
if (isInterruptedError(error)) {
// Interruptions do not indicate resource exhaustion, but the
// duration of a read that spans an interrupt does no indicate a
// reduction of throughput.
// We do not await the promise returned so our finally block runs
// before the promise settles.
return wrappedRead(location);
}
if (isExhaustedError(error)) {
// Multiplicative back-off if concurrency has caused the depletion of
// a resource, specifically file descriptors.
limit = Math.max(minimumConcurrency, limit / 2);
// We do not await the promise returned so our finally block runs
// before the promise settles.
return wrappedRead(location);
} else {
throw error;
}
} finally {
// Unblock further concurrent reads.
concurrency -= 1;
for (let i = 0; i <= limit - concurrency; i += 1) {
concurrency += 1;
queue.put();
// console.log('concurrency', concurrency);
}
}
};
return wrappedRead;
};

return { wrapRead };
};
36 changes: 28 additions & 8 deletions packages/compartment-mapper/src/node-powers.js
Expand Up @@ -5,6 +5,7 @@
/** @typedef {import('./types.js').WritePowers} WritePowers */

import { createRequire } from 'module';
import { makeGovernor } from './governor.js';

/**
* @param {string} location
Expand Down Expand Up @@ -33,25 +34,44 @@ const fakePathToFileURL = path => {
* @param {typeof import('fs')} args.fs
* @param {typeof import('url')} [args.url]
* @param {typeof import('crypto')} [args.crypto]
* @param {() => number} [args.now]
*/
const makeReadPowersSloppy = ({ fs, url = undefined, crypto = undefined }) => {
const makeReadPowersSloppy = ({
fs,
url = undefined,
crypto = undefined,
now = undefined,
}) => {
const fileURLToPath =
url === undefined ? fakeFileURLToPath : url.fileURLToPath;
const pathToFileURL =
url === undefined ? fakePathToFileURL : url.pathToFileURL;

now = now || Date.now;

/** @param {{message: string}} error */
const isExhaustedError = error => error.message.startsWith('EMFILE: ');

/** @param {{message: string}} error */
const isInterruptedError = error => error.message.startsWith('EAGAIN: ');

const { wrapRead } = makeGovernor({
minimumConcurrency: 1,
now,
isExhaustedError,
isInterruptedError,
});

/**
* @param {string} location
*/
const read = async location => {
try {
const path = fileURLToPath(location);
return await fs.promises.readFile(path);
} catch (error) {
throw Error(error.message);
}
const lossyRead = location => {
const path = fileURLToPath(location);
return fs.promises.readFile(path);
};

const read = wrapRead(lossyRead);

const requireResolve = (from, specifier, options) =>
createRequire(from).resolve(specifier, options);

Expand Down
9 changes: 8 additions & 1 deletion packages/compartment-mapper/test/scaffold.js
@@ -1,3 +1,5 @@
/* global performance */

import 'ses';
import fs from 'fs';
import crypto from 'crypto';
Expand All @@ -15,7 +17,12 @@ import {
} from '../index.js';
import { makeReadPowers } from '../src/node-powers.js';

export const readPowers = makeReadPowers({ fs, crypto, url });
export const readPowers = makeReadPowers({
fs,
crypto,
url,
now: performance.now,
});

export const sanitizePaths = (text = '', tolerateLineChange = false) => {
if (tolerateLineChange) {
Expand Down

0 comments on commit edf058a

Please sign in to comment.