Skip to content

Commit

Permalink
Setting the status to red on the first error then continually (#20343)
Browse files Browse the repository at this point in the history
initializing
  • Loading branch information
kobelb committed Jul 2, 2018
1 parent a2cc325 commit b8a110b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,31 @@
* you may not use this file except in compliance with the Elastic License.
*/
import * as Rx from 'rxjs';
import { catchError, mergeMap, map, retryWhen, switchMap, tap } from 'rxjs/operators';
import { catchError, mergeMap, map, switchMap, tap } from 'rxjs/operators';

export const retryStrategy = ({
maxAttempts,
scalingDuration,
}) => (errors) => {
return errors.pipe(
mergeMap((error, i) => {
const attempt = i + 1;
export const RETRY_SCALE_DURATION = 100;
export const RETRY_DURATION_MAX = 10000;

if (attempt >= maxAttempts) {
return Rx.throwError(error);
}
const calculateDuration = i => {
const duration = i * RETRY_SCALE_DURATION;
if (duration > RETRY_DURATION_MAX) {
return RETRY_DURATION_MAX;
}

return Rx.timer(attempt * scalingDuration);
})
);
return duration;
};

// we can't use a retryWhen here, because we want to propagate the red status and then retry
const propagateRedStatusAndScaleRetry = () => {
let i = 0;
return (err, caught) =>
Rx.concat(
Rx.of({
state: 'red',
message: err.message
}),
Rx.timer(calculateDuration(++i)).pipe(mergeMap(() => caught))
);
};

export function watchStatusAndLicenseToInitialize(xpackMainPlugin, downstreamPlugin, initialize) {
Expand Down Expand Up @@ -58,15 +66,11 @@ export function watchStatusAndLicenseToInitialize(xpackMainPlugin, downstreamPlu

return Rx.defer(() => initialize(license))
.pipe(
retryWhen(retryStrategy({ maxAttempts: 20, scalingDuration: 100 })),
map(() => ({
state: 'green',
message: 'Ready',
})),
catchError(err => Rx.of({
state: 'red',
message: err.message
}))
catchError(propagateRedStatusAndScaleRetry())
);
}),
tap(({ state, message }) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { EventEmitter } from 'events';
import { watchStatusAndLicenseToInitialize } from './watch_status_and_license_to_initialize';
import { watchStatusAndLicenseToInitialize, RETRY_SCALE_DURATION, RETRY_DURATION_MAX } from './watch_status_and_license_to_initialize';

const createMockXpackMainPluginAndFeature = (featureId) => {
const licenseChangeCallbacks = [];
Expand Down Expand Up @@ -62,6 +62,15 @@ const createMockDownstreamPlugin = (id) => {
};
};

const advanceRetry = async (initializeCount) => {
await Promise.resolve();
let duration = initializeCount * RETRY_SCALE_DURATION;
if (duration > RETRY_DURATION_MAX) {
duration = RETRY_DURATION_MAX;
}
jest.advanceTimersByTime(duration);
};

['red', 'yellow', 'disabled'].forEach(state => {
test(`mirrors ${state} immediately`, () => {
const pluginId = 'foo-plugin';
Expand Down Expand Up @@ -115,7 +124,7 @@ test(`sets downstream plugin's status to green when initialize resolves`, (done)
});
});

test(`sets downstream plugin's status to red when initialize rejects 20 times`, (done) => {
test(`sets downstream plugin's status to red when initialize initially rejects, and continually polls initialize`, (done) => {
jest.useFakeTimers();

const pluginId = 'foo-plugin';
Expand All @@ -126,15 +135,25 @@ test(`sets downstream plugin's status to red when initialize rejects 20 times`,
mockFeature.mock.setLicenseCheckResults(licenseCheckResults);
const downstreamPlugin = createMockDownstreamPlugin(pluginId);

let isRed = false;
let initializeCount = 0;
const initializeMock = jest.fn().mockImplementation(() => {
++initializeCount;

// on the second retry, ensure we already set the status to red
if (initializeCount === 2) {
expect(isRed).toBe(true);
}

// this should theoretically continue indefinitely, but we only have so long to run the tests
if (initializeCount === 100) {
done();
}

// everytime this is called, we have to wait for a new promise to be resolved
// allowing the Promise the we return below to run, and then advance the timers
setImmediate(async () => {
await Promise.resolve();
jest.advanceTimersByTime(initializeCount * 100);
setImmediate(() => {
advanceRetry(initializeCount);
});
return Promise.reject(new Error(errorMessage));
});
Expand All @@ -144,9 +163,8 @@ test(`sets downstream plugin's status to red when initialize rejects 20 times`,
expect(initializeMock).toHaveBeenCalledTimes(1);
expect(initializeMock).toHaveBeenCalledWith(licenseCheckResults);
downstreamPlugin.status.red.mockImplementation(message => {
expect(initializeCount).toBe(20);
isRed = true;
expect(message).toBe(errorMessage);
done();
});
});

Expand All @@ -167,9 +185,8 @@ test(`sets downstream plugin's status to green when initialize resolves after re

// everytime this is called, we have to wait for a new promise to be resolved
// allowing the Promise the we return below to run, and then advance the timers
setImmediate(async () => {
await Promise.resolve();
jest.advanceTimersByTime(initializeCount * 100);
setImmediate(() => {
advanceRetry(initializeCount);
});

if (initializeCount >= 10) {
Expand All @@ -183,6 +200,10 @@ test(`sets downstream plugin's status to green when initialize resolves after re

expect(initializeMock).toHaveBeenCalledTimes(1);
expect(initializeMock).toHaveBeenCalledWith(licenseCheckResults);
downstreamPlugin.status.red.mockImplementation(message => {
expect(initializeCount).toBeLessThan(10);
expect(message).toBe(errorMessage);
});
downstreamPlugin.status.green.mockImplementation(message => {
expect(initializeCount).toBe(10);
expect(message).toBe('Ready');
Expand Down

0 comments on commit b8a110b

Please sign in to comment.