Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ in this project. The frequency of node rotations is passed into the template as
### Running Manually
Sometimes it's useful to rotate an ES node manually (e.g. during an ES upgrade), you can optionally pass a `targetInstanceId` in the step function input object. It's usually easiest to open an existing execution and click `New Execution` then just edit the input object.

### Rotating nodes into a new ASG

Very occasionally, it is required to migrate a cluster into a new Autoscaling Group. To do this with the node rotation step function by:

1. Follow the setup steps above.
1. Create the new ASG with DesiredCapacity set to 0.
1. Set the MinimumCapacity of the old ASG to 0.
1. Tag the new ASG with `gu:riffraff:new-asg = True`. (This is the tag that is already used by riff-raff for identifying the newer ASG during migrations).
1. Run as normal, either manually or letting the schedule rotate the instances.

The step function will detect and launch new instances in the new ASG, while removing nodes from the old ASG.

> [!WARNING]
> This feature has been developed and tested for Elasticsearch clusters which exist in a single ASG, and the "new" ASG can be
> matched to the "old" one using Stage/Stack/App tags. If your usecase doesn't match this, you'll likely need to do some more testing
> and possibly improve this feature. If you're at all unsure, get in touch in the Elasticsearch chat space and we can figure out
> any potential issues together.


## Implementation

This Step Function triggers a number of TypeScript lambdas, which coordinate the process of replacing a node by:
Expand Down
1 change: 1 addition & 0 deletions cloudformation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Resources:
Action:
- autoscaling:DetachInstances
- autoscaling:AttachInstances
- autoscaling:SetDesiredCapacity
- autoscaling:TerminateInstanceInAutoScalingGroup
Resource:
- !Sub arn:aws:autoscaling:${AWS::Region}:${AWS::AccountId}:autoScalingGroup:*:autoScalingGroupName/*
Expand Down
8 changes: 4 additions & 4 deletions src/addNode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {detachInstance} from './aws/autoscaling';
import {launchNewInstance} from './aws/autoscaling';
import {AddNodeResponse, ClusterStatusResponse} from './utils/handlerInputs';
import {Elasticsearch} from './elasticsearch/elasticsearch';
import {Instance} from './aws/types';
Expand All @@ -7,17 +7,17 @@ import {ElasticsearchClusterStatus} from './elasticsearch/types';
export async function handler(event: ClusterStatusResponse): Promise<AddNodeResponse> {

const targetInstance: Instance = event.targetElasticSearchNode.ec2Instance;
const asg: string = event.asgName;
const asg: string = event.destinationAsgName;
const elasticsearchClient = new Elasticsearch(targetInstance.id)

return new Promise<AddNodeResponse>((resolve, reject) => {

elasticsearchClient.updateRebalancingStatus("none")
.then(() => detachInstance(targetInstance, asg))
.then(() => launchNewInstance(targetInstance, asg))
.then(() => elasticsearchClient.getClusterHealth())
.then((clusterStatus: ElasticsearchClusterStatus) => {
const response: AddNodeResponse = {
"asgName": asg,
"destinationAsgName": asg,
"targetElasticSearchNode": event.targetElasticSearchNode,
"expectedClusterSize": clusterStatus.number_of_nodes + 1
};
Expand Down
2 changes: 1 addition & 1 deletion src/autoScalingGroupCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {getASG} from "./aws/autoscaling";

export async function handler(event: AsgInput): Promise<AsgInput> {
try {
const asg = await getASG(event.asgName)
const asg = await getASG(event.destinationAsgName)

if (asg.MaxSize <= asg.Instances.length) {
const error = `ASG MaxSize must be greater than Desired Capacity to allow for ReattachTargetInstance step.`
Expand Down
46 changes: 37 additions & 9 deletions src/aws/autoscaling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,50 @@ import {
AutoScaling, AutoScalingGroup, DescribeAutoScalingGroupsCommand, DescribeAutoScalingGroupsCommandOutput,
DetachInstancesCommand,
DetachInstancesCommandOutput,
SetDesiredCapacityCommand,
TerminateInstanceInAutoScalingGroupCommand,
TerminateInstanceInAutoScalingGroupCommandOutput
} from "@aws-sdk/client-auto-scaling";

const awsAutoscaling = new AutoScaling();

export function detachInstance(instance: Instance, asgName: string): Promise<DetachInstancesCommandOutput> {
console.log(`Detaching ${instance.id} from ${asgName}. This should also bring a new instance into the ASG`);
const params = {
InstanceIds: [ instance.id ],
AutoScalingGroupName: asgName,
ShouldDecrementDesiredCapacity: false
};
const req = new DetachInstancesCommand(params);
export async function launchNewInstance(instance: Instance, asgName: string): Promise<DetachInstancesCommandOutput> {
if (instance.autoScalingGroupName === asgName) {
console.log(`Detaching ${instance.id} from ${asgName}. This should also bring a new instance into the ASG`);
const params = {
InstanceIds: [ instance.id ],
AutoScalingGroupName: asgName,
ShouldDecrementDesiredCapacity: false
};
const req = new DetachInstancesCommand(params);

return retry(() => awsAutoscaling.send(req), `detaching instance ${instance.id}`, 5)
return retry(() => awsAutoscaling.send(req), `detaching instance ${instance.id}`, 5)
} else {
console.log(`Launch new instance to new ASG ${asgName}.`);
const asgs = await retry(
() => awsAutoscaling.send(new DescribeAutoScalingGroupsCommand({
AutoScalingGroupNames: [asgName]
})),
`getting current capacity in ${asgName}`,
5,
);
if (!asgs.AutoScalingGroups || asgs.AutoScalingGroups.length === 0) {
throw new Error(`No AutoScalingGroup found with name ${asgName}`);
}
const capacity = asgs.AutoScalingGroups[0].DesiredCapacity;
if (typeof capacity !== 'number' || isNaN(capacity)) {
throw new Error(`DesiredCapacity is not defined or not a number for ASG ${asgName}`);
}

return retry(
() => awsAutoscaling.send(new SetDesiredCapacityCommand({
AutoScalingGroupName: asgName,
DesiredCapacity: capacity + 1,
})),
`launching new instance in ${asgName}`,
5,
);
}
}

export function attachInstance(instance: Instance, asgName: string): Promise<{}> {
Expand Down
4 changes: 2 additions & 2 deletions src/clusterSizeCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {getASG} from "./aws/autoscaling";

export async function handler(event: AddNodeResponse): Promise<TargetAndNewNodeResponse> {

const asg = await getASG(event.asgName)
const asg = await getASG(event.destinationAsgName)
const instanceIds = asg.Instances.map(i => i.InstanceId)
const newestInstance = await getSpecificInstance(instanceIds, findNewestInstance)
const elasticsearchClient = new Elasticsearch(event.targetElasticSearchNode.ec2Instance.id)
Expand All @@ -27,7 +27,7 @@ export async function handler(event: AddNodeResponse): Promise<TargetAndNewNodeR
})
.then( (newestElasticsearchNode: ElasticsearchNode) => {
const response: TargetAndNewNodeResponse = {
"asgName": event.asgName,
"destinationAsgName": event.destinationAsgName,
"targetElasticSearchNode": event.targetElasticSearchNode,
"newestElasticsearchNode": newestElasticsearchNode
};
Expand Down
2 changes: 1 addition & 1 deletion src/elasticsearch/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class ElasticsearchNode {
nodeId: string;
isMasterEligible: boolean;

constructor(instance: Instance, nodeId: string, isMasterEligible) {
constructor(instance: Instance, nodeId: string, isMasterEligible: boolean) {
this.ec2Instance = instance;
this.nodeId = nodeId;
this.isMasterEligible = isMasterEligible
Expand Down
58 changes: 52 additions & 6 deletions src/getTargetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,48 @@ import {getInstancesByTag} from './aws/ec2Instances';
import {getASGsByTag} from "./aws/autoscaling";
import {Elasticsearch} from "./elasticsearch/elasticsearch";
import {Instance} from "./aws/types";
import { type AutoScalingGroup } from '@aws-sdk/client-auto-scaling';

function asgTagsToRecord(asg: AutoScalingGroup): Record<string, string> {
return Object.fromEntries(
(asg.Tags ?? [])
.filter(tag => tag.Key !== undefined && tag.Value !== undefined)
.map(tag => [tag.Key!, tag.Value!])
);
}

/** attempt to find an ASG with the same tagging as the target instance's,
* but identifies itself as a 'new' ASG (using the 'gu:riffraff:new-asg'
* tag) so should be the destination of the node rotation.
* Only respects the Stage/Stack/App tags, when defined on the target
* instance's ASG.
*/
function findNewAsgMatchingInstanceAsg(
eligibleASGs: AutoScalingGroup[],
targetInstance: Instance,
): AutoScalingGroup | undefined {

const targetInstanceAsg = eligibleASGs
.find(a => a.AutoScalingGroupName === targetInstance.autoScalingGroupName);

if (!targetInstanceAsg) {
throw new Error(`Couldn't find target instance's ASG (${targetInstance.autoScalingGroupName}) in the list of eligible ASGs - that shouldn't happen!`);
}

const targetAsgTags = asgTagsToRecord(targetInstanceAsg);
const expectedTags = ['App', 'Stack', 'Stage'].filter(t => t in targetAsgTags);

const newAsg = eligibleASGs.find(a => {
const asgTags = asgTagsToRecord(a);

return a.AutoScalingGroupName !== targetInstanceAsg.AutoScalingGroupName
&& expectedTags.every(t => asgTags[t] === targetAsgTags[t])
&& 'gu:riffraff:new-asg' in asgTags;
});

return newAsg;

}

export async function handler(event: StateMachineInput): Promise<AsgDiscoveryResponse> {
const runningExecutionsPromise = totalRunningExecutions(event.stepFunctionArn)
Expand All @@ -14,14 +56,12 @@ export async function handler(event: StateMachineInput): Promise<AsgDiscoveryRes
return { skipRotation: true };
}

const eligibleASGs = (
await getASGsByTag(event.autoScalingGroupDiscoveryTagKey, "true")
).map(asg => asg.AutoScalingGroupName);
const eligibleASGs = await getASGsByTag(event.autoScalingGroupDiscoveryTagKey, "true");

const eligibleInstances = (
// TODO it would be nice to not need the Tags on the instances as well, but currently used in the ElasticsearchAdminSsmPolicy IAM policy in cloudformation.yaml
await getInstancesByTag(event.autoScalingGroupDiscoveryTagKey, "true")
).filter(i => eligibleASGs.includes(i.autoScalingGroupName));
).filter(i => eligibleASGs.some(a => a.AutoScalingGroupName === i.autoScalingGroupName));

// We can manually run rotation against a particular instance if needed
if(event.targetInstanceId) {
Expand All @@ -38,7 +78,11 @@ export async function handler(event: StateMachineInput): Promise<AsgDiscoveryRes
const elasticsearchClient = new Elasticsearch(targetInstanceId);
const targetElasticSearchNode = await elasticsearchClient.getElasticsearchNode(targetInstance);
console.log(`Instance ${targetInstanceId} (ASG: ${asgName}) specified as input. Moving on...`);
return { asgName, targetElasticSearchNode, skipRotation: false };

const maybeNewAsg = findNewAsgMatchingInstanceAsg(eligibleASGs, targetInstance);

const destinationAsgName = maybeNewAsg?.AutoScalingGroupName ?? asgName;
return { destinationAsgName, targetElasticSearchNode, skipRotation: false };
}

console.log(`Found ${eligibleInstances.length} instances with tag ${event.autoScalingGroupDiscoveryTagKey}`);
Expand All @@ -58,8 +102,10 @@ export async function handler(event: StateMachineInput): Promise<AsgDiscoveryRes
const elasticsearchClient = new Elasticsearch(oldestInstance.id);
const targetElasticSearchNode = await elasticsearchClient.getElasticsearchNode(oldestInstance);
console.log(`Triggering rotation of oldest instance ${oldestInstance.id} (ASG: ${oldestInstance.autoScalingGroupName})`);

const maybeNewAsg = findNewAsgMatchingInstanceAsg(eligibleASGs, oldestInstance);
return {
asgName: oldestInstance.autoScalingGroupName,
destinationAsgName: maybeNewAsg?.AutoScalingGroupName ?? oldestInstance.autoScalingGroupName,
targetElasticSearchNode,
skipRotation: false
}
Expand Down
7 changes: 6 additions & 1 deletion src/reattachTargetInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import {Instance} from './aws/types';
export async function handler(event: TargetAndNewNodeResponse): Promise<TargetAndNewNodeResponse> {

const targetInstance: Instance = event.targetElasticSearchNode.ec2Instance;
const asg: string = event.asgName;
const asg: string = event.destinationAsgName;

if (targetInstance.autoScalingGroupName !== asg) {
console.log(`New instance launched in different ASG than target instance, so nothing to reattach`);
return event;
}

return new Promise<TargetAndNewNodeResponse>((resolve, reject) => {
attachInstance(targetInstance, asg)
Expand Down
5 changes: 4 additions & 1 deletion src/utils/handlerInputs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ export interface StateMachineInput {
}

export interface AsgInput {
asgName: string;
// The ASG a node will be rotated _into_. In certain circumstances this may
// not be the same as the ASG the node was rotated _out of_, which is stored at
// targetElasticSearchNode.ec2Instance.autoScalingGroupName.
destinationAsgName: string;
targetElasticSearchNode: ElasticsearchNode;
}

Expand Down
Loading