Skip to content

Commit

Permalink
fix(cli): stack monitor reads complete stack history every 5 seconds (#…
Browse files Browse the repository at this point in the history
…9795)

The stack monitor used to read all stack events on every tick (every 1-5
seconds).

That's right, every 5 seconds it would page through all events that had
ever happened to the stack, only to get to the last page to see if there
were new events. We were quadratic in our `DescribeStackEvents` API
call.

This behavior has probably contributed to our users getting throttled by
CloudFormation in the past, and only really shows pathological
behavior on stacks with a long history (which we ourselves don't
routinely manage using `cdk deploy`), which is why it had eluded
detection for so long.

The fix is to remember the token of the last page between calls,
so that we don't have to read through all pages just to get to the last
one.

Fixes #9470.


----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
rix0rrr committed Aug 18, 2020
1 parent 97ef371 commit cace51a
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 87 deletions.
4 changes: 2 additions & 2 deletions packages/aws-cdk/lib/api/deploy-stack.ts
Expand Up @@ -265,7 +265,7 @@ export async function deployStack(options: DeployStackOptions): Promise<DeploySt
debug('Initiating execution of changeset %s on stack %s', changeSetName, deployName);
await cfn.executeChangeSet({ StackName: deployName, ChangeSetName: changeSetName }).promise();
// eslint-disable-next-line max-len
const monitor = options.quiet ? undefined : new StackActivityMonitor(cfn, deployName, stackArtifact, {
const monitor = options.quiet ? undefined : StackActivityMonitor.withDefaultPrinter(cfn, deployName, stackArtifact, {
resourcesTotal: (changeSetDescription.Changes ?? []).length,
}).start();
debug('Execution of changeset %s on stack %s has started; waiting for the update to complete...', changeSetName, deployName);
Expand Down Expand Up @@ -364,7 +364,7 @@ export async function destroyStack(options: DestroyStackOptions) {
if (!currentStack.exists) {
return;
}
const monitor = options.quiet ? undefined : new StackActivityMonitor(cfn, deployName, options.stack).start();
const monitor = options.quiet ? undefined : StackActivityMonitor.withDefaultPrinter(cfn, deployName, options.stack).start();

try {
await cfn.deleteStack({ StackName: deployName, RoleARN: options.roleArn }).promise();
Expand Down
195 changes: 110 additions & 85 deletions packages/aws-cdk/lib/api/util/cloudformation/stack-activity-monitor.ts
Expand Up @@ -6,18 +6,17 @@ import * as colors from 'colors/safe';
import { error, logLevel, LogLevel, setLogLevel } from '../../../logging';
import { RewritableBlock } from '../display';

interface StackActivity {
export interface StackActivity {
readonly event: aws.CloudFormation.StackEvent;
readonly metadata?: ResourceMetadata;
flushed: boolean;
}

interface ResourceMetadata {
export interface ResourceMetadata {
entry: cxschema.MetadataEntry;
constructPath: string;
}

export interface StackActivityMonitorProps {
export interface WithDefaultPrinterProps {
/**
* Total number of resources to update
*
Expand Down Expand Up @@ -47,13 +46,39 @@ export interface StackActivityMonitorProps {
}

export class StackActivityMonitor {
private active = false;
private activity: { [eventId: string]: StackActivity } = { };

/**
* Number of ms to wait between pagination calls
* Create a Stack Activity Monitor using a default printer, based on context clues
*/
private readonly pageSleep = 500;
public static withDefaultPrinter(
cfn: aws.CloudFormation,
stackName: string,
stackArtifact: cxapi.CloudFormationStackArtifact, options: WithDefaultPrinterProps = {}) {
const stream = process.stderr;

const props: PrinterProps = {
resourceTypeColumnWidth: calcMaxResourceTypeLength(stackArtifact.template),
resourcesTotal: options.resourcesTotal,
stream,
};

const isWindows = process.platform === 'win32';
const verbose = options.logLevel ?? logLevel;
// On some CI systems (such as CircleCI) output still reports as a TTY so we also
// need an individual check for whether we're running on CI.
// see: https://discuss.circleci.com/t/circleci-terminal-is-a-tty-but-term-is-not-set/9965
const fancyOutputAvailable = !isWindows && stream.isTTY && !options.ci;

const printer = fancyOutputAvailable && !verbose
? new CurrentActivityPrinter(props)
: new HistoryActivityPrinter(props);

return new StackActivityMonitor(cfn, stackName, printer, stackArtifact);
}


private active = false;
private activity: { [eventId: string]: StackActivity } = { };

/**
* Determines which events not to display
Expand All @@ -68,40 +93,27 @@ export class StackActivityMonitor {
/**
* Set to the activity of reading the current events
*/
private readPromise?: Promise<AWS.CloudFormation.StackEvent[]>;
private readPromise?: Promise<ReadEventsResult>;

private readonly printer: ActivityPrinterBase;
/**
* Pagination token for the next page of stack events
*
* Retained between ticks in order to avoid being O(N^2) in the length of
* the stack event log
*/
private nextToken?: string;

constructor(
private readonly cfn: aws.CloudFormation,
private readonly stackName: string,
private readonly stack: cxapi.CloudFormationStackArtifact,
options: StackActivityMonitorProps = {}) {

const stream = process.stderr;

const props: PrinterProps = {
resourceTypeColumnWidth: calcMaxResourceTypeLength(this.stack.template),
resourcesTotal: options.resourcesTotal,
stream,
};

const isWindows = process.platform === 'win32';
const verbose = options.logLevel ?? logLevel;
// On some CI systems (such as CircleCI) output still reports as a TTY so we also
// need an individual check for whether we're running on CI.
// see: https://discuss.circleci.com/t/circleci-terminal-is-a-tty-but-term-is-not-set/9965
const fancyOutputAvailable = !isWindows && stream.isTTY && !options.ci;

this.printer = fancyOutputAvailable && !verbose
? new CurrentActivityPrinter(props)
: new HistoryActivityPrinter(props);
}
private readonly printer: IActivityPrinter,
private readonly stack?: cxapi.CloudFormationStackArtifact,
) {}

public start() {
this.active = true;
this.printer.start();
this.scheduleNextTick();
this.scheduleNextTick(true);
return this;
}

Expand All @@ -115,56 +127,42 @@ export class StackActivityMonitor {
if (this.readPromise) {
// We're currently reading events, wait for it to finish and print them before continuing.
await this.readPromise;
this.flushEvents();
this.printer.print();
}
}

private scheduleNextTick() {
private scheduleNextTick(newPageAvailable: boolean) {
if (!this.active) {
return;
}
this.tickTimer = setTimeout(() => this.tick().then(), this.printer.updateSleep);

const sleepMs = newPageAvailable ? 0 : this.printer.updateSleep;
this.tickTimer = setTimeout(() => void(this.tick()), sleepMs);
}

private async tick() {
if (!this.active) {
return;
}

let newPage = false;
try {
this.readPromise = this.readEvents();
await this.readPromise;
this.readPromise = this.readNewEvents((a) => this.printer.addActivity(a));
newPage = (await this.readPromise).nextPage;
this.readPromise = undefined;

// We might have been stop()ped while the network call was in progress.
if (!this.active) { return; }

this.flushEvents();
this.printer.print();
} catch (e) {
error('Error occurred while monitoring stack: %s', e);
}
this.scheduleNextTick();
}

/**
* Flushes all unflushed events sorted by timestamp.
*/
private flushEvents() {
Object.keys(this.activity)
.map(a => this.activity[a])
.filter(a => a.event.Timestamp.valueOf() > this.startTime)
.filter(a => !a.flushed)
.sort((lhs, rhs) => lhs.event.Timestamp.valueOf() - rhs.event.Timestamp.valueOf())
.forEach(a => {
a.flushed = true;
this.printer.addActivity(a);
});

this.printer.print();
this.scheduleNextTick(newPage);
}

private findMetadataFor(logicalId: string | undefined): ResourceMetadata | undefined {
const metadata = this.stack.manifest.metadata;
const metadata = this.stack?.manifest?.metadata;
if (!logicalId || !metadata) { return undefined; }
for (const path of Object.keys(metadata)) {
const entry = metadata[path]
Expand All @@ -180,39 +178,53 @@ export class StackActivityMonitor {
return undefined;
}

private async readEvents(nextToken?: string): Promise<AWS.CloudFormation.StackEvent[]> {
const output = await this.cfn.describeStackEvents({ StackName: this.stackName, NextToken: nextToken }).promise()
.catch( e => {
if (e.code === 'ValidationError' && e.message === `Stack [${this.stackName}] does not exist`) {
return undefined;
}
throw e;
});

let events = output && output.StackEvents || [];
let allNew = true;
/**
* Reads a single page stack events from the stack, invoking a callback on the new ones (in order)
*
* Returns whether there is a next page of events availablle.
*/
private async readNewEvents(cb: (x: StackActivity) => void): Promise<ReadEventsResult> {
let response;

// merge events into the activity and dedup by event id
for (const e of events) {
if (e.EventId in this.activity) {
allNew = false;
break;
try {
response = await this.cfn.describeStackEvents({ StackName: this.stackName, NextToken: this.nextToken }).promise();
} catch (e) {
if (e.code === 'ValidationError' && e.message === `Stack [${this.stackName}] does not exist`) {
return { nextPage: false };
}
throw e;
}

this.activity[e.EventId] = {
flushed: false,
event: e,
metadata: this.findMetadataFor(e.LogicalResourceId),
};
for (const event of response?.StackEvents ?? []) {
// Already seen this one
if (event.EventId in this.activity) { continue; }

// Event from before we were interested in 'em
if (event.Timestamp.valueOf() < this.startTime) { continue; }

// Invoke callback
cb(this.activity[event.EventId] = {
event: event,
metadata: this.findMetadataFor(event.LogicalResourceId),
});
}

// only read next page if all the events we read are new events. otherwise, we can rest.
if (allNew && output && output.NextToken) {
await new Promise(cb => setTimeout(cb, this.pageSleep));
events = events.concat(await this.readEvents(output.NextToken));
// Replace the "next token" if we have one, so that we start paginating from the next
// page on the next call.
//
// Crucially -- this token will ONLY be returned if there is a next page to
// read already. If not, we're at the end of the list of events.
//
// If there's no next page to read, we don't reset our paging token though. There might
// be a new page to read in the future, and we don't want to have to page through
// pages 1-N on the next call just to get page (N+1). In that case simply retain the
// current token, requesting page N again until (N+1) appears.
if (response?.NextToken) {
this.nextToken = response.NextToken;
}

return events;
// Return whether there is a new page available
return { nextPage: response?.NextToken !== undefined };
}

private simplifyConstructPath(path: string) {
Expand All @@ -227,6 +239,10 @@ export class StackActivityMonitor {
}
}

interface ReadEventsResult {
readonly nextPage: boolean;
}

function padRight(n: number, x: string): string {
return x + ' '.repeat(Math.max(0, n - x.length));
}
Expand Down Expand Up @@ -267,7 +283,16 @@ interface PrinterProps {
readonly stream: NodeJS.WriteStream;
}

abstract class ActivityPrinterBase {
export interface IActivityPrinter {
readonly updateSleep: number;

addActivity(activity: StackActivity): void;
print(): void;
start(): void;
stop(): void;
}

abstract class ActivityPrinterBase implements IActivityPrinter {
/**
* Fetch new activity every 5 seconds
*/
Expand Down
57 changes: 57 additions & 0 deletions packages/aws-cdk/test/util/stack-monitor.test.ts
@@ -0,0 +1,57 @@
import { StackActivityMonitor, IActivityPrinter, StackActivity } from '../../lib/api/util/cloudformation/stack-activity-monitor';
import { sleep } from '../integ/cli/aws-helpers';
import { MockSdk } from './mock-sdk';

let sdk: MockSdk;
let printer: FakePrinter;
beforeEach(() => {
sdk = new MockSdk();
printer = new FakePrinter();
});

test('retain page token between ticks', async () => {
let finished = false;
sdk.stubCloudFormation({
describeStackEvents: (jest.fn() as jest.Mock<AWS.CloudFormation.DescribeStackEventsOutput, [AWS.CloudFormation.DescribeStackEventsInput]>)
// First call, return a page token
.mockImplementationOnce((request) => {
expect(request.NextToken).toBeUndefined();
return { NextToken: 'token' };
})
// Second call, expect page token, return no page
.mockImplementationOnce(request => {
expect(request.NextToken).toEqual('token');
return { };
})
// Third call, ensure we still get the same page token
.mockImplementationOnce(request => {
expect(request.NextToken).toEqual('token');
finished = true;
return { };
}),
});

const monitor = new StackActivityMonitor(sdk.cloudFormation(), 'StackName', printer).start();
await waitForCondition(() => finished);
await monitor.stop();
});


class FakePrinter implements IActivityPrinter {
public updateSleep: number = 0;
public readonly activities: StackActivity[] = [];

public addActivity(activity: StackActivity): void {
this.activities.push(activity);
}

public print(): void { }
public start(): void { }
public stop(): void { }
}

async function waitForCondition(cb: () => boolean): Promise<void> {
while (!cb()) {
await sleep(10);
}
}

0 comments on commit cace51a

Please sign in to comment.