Skip to content

Commit

Permalink
feat: make it work
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalWilinski committed Aug 9, 2022
1 parent 44960a0 commit 272fb6f
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 83 deletions.
90 changes: 56 additions & 34 deletions README.md
Expand Up @@ -4,27 +4,34 @@
[Functionless](https://github.com/functionless/functionless)-based mini-framework for DynamoDB migrations in AWS CDK. `dynamodb-migrations` leverages [Step Functions](https://aws.amazon.com/step-functions/) to enable massively parallel reads and writes to DynamoDB making your migrations faster.

`dynamodb-migrations` uses _"Migrations as Infrastructure"_ approach - each migration ends up being a separate State Machine, each one of them is deployed as a separate Stack.
`dynamodb-migrations` uses _"Migrations as Infrastructure"_ approach - each migration ends up being a separate State Machine, each one of them is deployed as a separate Nested Stack.

Migrations are ran as a part of CloudFormation deployment cycle - each migration, provisioned as a state machine, gets invoked via CloudFormation Custom Resource which is also part of the stack itself.

```mermaid
sequenceDiagram
participant User
participant CloudFormation
User-->>+CloudFormation: Submits a Template via `cdk deploy`
User-->>+CloudFormation: Start deployment with a migration via `cdk deploy`
Note right of CloudFormation: Update/Create Stack
CloudFormation->>Cloud: Provision Application Resources
CloudFormation->>+Step Functions: Provision Migration State Machine
CloudFormation->>+Step Functions: Provision Migration State Machine as a Nested Stack
Step Functions->>-CloudFormation: Provisioned
CloudFormation->>+Custom Resource: Perform Migration
CloudFormation->>+Custom Resource: Perform Migrations by starting executions
Custom Resource->>+Migrations History Table: Check which migrations have been already ran
Migrations History Table->>-Custom Resource: Here are migrations ran in the past
Note Right of Custom Resource: There's one new migration that hasn't been ran yet
Custom Resource-->>Step Functions: Run State Machine
par Custom Resource to DynamoDB Table
Step Functions->>+DynamoDB Table: Update Items
Note left of Step Functions: State machine is an actual definition of migration
par Perform migration
Step Functions->>+DynamoDB Table: Update / Delete Items
end
par Periodically check if migration is done
Custom Resource->>+Step Functions: Is done?
and State Machine Finished Execution
Custom Resource->>Migrations History Table: Store info about completed migration
Custom Resource->>-CloudFormation: End
end
Custom Resource->>Migrations History Table: Migration X Complete
Custom Resource->>-CloudFormation: End
CloudFormation-->>-User: Update Complete
```

### Questions to answer / notes
Expand Down Expand Up @@ -60,39 +67,54 @@ new MigrationsManager(this, 'MigrationsManager', {

This will create an additional DynamoDB table that will be used to store the migrations history.

3. Initialize a new migration by using the `dynamodb-migrations init` command:

```bash
npx dynamodb-migrations init --dir ./migrations --name my-migration
```

4. Write an actual migration.

Following migration will add a new attribute called `migrated` to every item in the table.
Create file called `20220101-add-attribute.ts` in the `migrationsDir` and paste following contents. This migration will add a new attribute called `migrated` to every item in the table.

```ts
import {
MigrationFunction,
ScanOutput,
$AWS,
} from "@dynobase/dynamodb-migrations";

export const up: MigrationFunction = (scope, migrationName) =>
new Migration(scope, migrationName, {
tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/SubjectTable", // can be also read from StackOutputs
}).run(async (_table, result: ScanOutput<any, any, any>) => {
for (const item of result.Items as any[]) {
import { $AWS } from "functionless";
import { unmarshall, marshall } from "typesafe-dynamodb/lib/marshall";
import { Migration, MigrationFunction } from "../..";

const tableArn =
"arn:aws:dynamodb:us-east-1:085108115628:table/TestStack-TableCD117FA1-ZVV3ZWUOWPO";

export const migration: MigrationFunction = (scope, migrationName) => {
const migrationDefinition = new Migration<any>(scope, migrationName, {
tableArn,
migrationName,
});

const table = migrationDefinition.table;

// Actual migration code goes here.
// For each item in the table
migrationDefinition.scan(async ({ result }) => {
for (const i of result.Items as any[]) {
// Do the following
await $AWS.DynamoDB.PutItem({
Table: _table,
Item: {
...item,
migrated: {
S: `migrated`,
},
},
Table: table,
// Add migratedAt attribute to the item
Item: marshall({ ...unmarshall(i), migratedAt: Date.now() }),
});
}
});

return migrationDefinition;
};
```

And that's it! This migration will be executed as a part of the next `cdk deploy` command fully under CloudFormation's control. After successfully running it, the migration will be marked as `migrated` in the migrations history table.

## Todo

- [x] Do not re-run previously applied migrations
- [ ] Add an option for disposing state machines after running migrations
- [ ] Distinguish up/down migrations - if rollback is being performed, then Custom Resource should call `down` migration which will reverse the effect of the `up` migration
- [ ] Dry runs
- [ ] CLI for creating new migration files
- [ ] Better contract for writing migrations/semantics
- [ ] Reporting progress
- [ ] Storing `executionId` in the migrations history table
- [ ] Package and publish to NPM
- [ ] More examples
3 changes: 2 additions & 1 deletion package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 36 additions & 20 deletions src/custom-resource-migrations-runner.ts
@@ -1,4 +1,4 @@
// import { Table as cdkTable } from "aws-cdk-lib/aws-dynamodb";
import { SFNClient, StartExecutionCommand } from "@aws-sdk/client-sfn";
import {
CloudFormationCustomResourceEvent,
CloudFormationCustomResourceResponse,
Expand All @@ -7,12 +7,16 @@ import {
} from "aws-lambda";
import { Construct } from "constructs";
import { $AWS, Function, Table } from "functionless";
import { Migration } from "./migration";
import { MigrationHistoryItem } from "./migrations-manager";
import { marshall } from "typesafe-dynamodb/lib/marshall";
import { MigrationHistoryItem, MigrationStatus } from "./migrations-manager";

type MigrationIdStateMachineArnPair = {
migrationId: string;
stateMachineArn: string;
};

export type CustomResourceMigrationsRunnerProps = {
migrationFiles: string[];
migrationStacks: Migration<any>[];
migrationIdStateMachinePairs: MigrationIdStateMachineArnPair[];
migrationsHistoryTable: Table<MigrationHistoryItem, "id">;
};

Expand All @@ -26,42 +30,54 @@ export default class CustomResourceMigrationsRunner extends Construct {
id: string,
{
migrationsHistoryTable,
migrationStacks,
migrationIdStateMachinePairs,
}: CustomResourceMigrationsRunnerProps
) {
super(scope, id);

// todo/ to think: maybe this can be a step function too?
// I think it cannot because of custom resources provider - it requires a function
this.function = new Function(
scope,
`${id}-MigrationsRunner`,
async (event: CloudFormationCustomResourceEvent) => {
console.log(event);

const client = new SFNClient({});

try {
const migrations = await $AWS.DynamoDB.Scan({
const storedMigrations = await $AWS.DynamoDB.Scan({
Table: migrationsHistoryTable,
});

console.log({ migrations, migrationStacks });
console.log({ storedMigrations, migrationIdStateMachinePairs });

// todo: Ensure chronological order of migrations.
const migrationsToRun = migrationStacks.filter(
(migrationStack) =>
!(migrations.Items ?? []).find(
(migration) => migration.id.S === migrationStack.stackId
const migrationsToRun = migrationIdStateMachinePairs.filter(
(migrationStateMachinePair) =>
!(storedMigrations.Items ?? []).find(
(storedMigration) =>
storedMigration.id.S === migrationStateMachinePair.migrationId
)
);

console.log({ migrationsToRun });

// todo: run in sequence actually
if (migrationsToRun[0].stateMachine) {
await migrationsToRun[0].stateMachine({});
// todo: store migration state
// todo: after finish, mark it as complete
// todo: maybe isCompleteHandler should take care of it?
for (const migration of migrationsToRun) {
// todo: Depending on the cloudformation transition (success/rollback) we could either use Up or Down state machine
const command = new StartExecutionCommand({
stateMachineArn: migration.stateMachineArn,
});
const response = await client.send(command);

console.log({ migration, response });

await $AWS.DynamoDB.PutItem({
Table: migrationsHistoryTable,
Item: marshall({
id: migration.migrationId,
status: "in_progress" as MigrationStatus,
startedAt: response.startDate?.toISOString()!,
}),
});
}

return {
Expand Down
8 changes: 2 additions & 6 deletions src/examples/migrations/20220801-add-attribute.ts
@@ -1,5 +1,4 @@
import { Table as cdkTable } from "aws-cdk-lib/aws-dynamodb";
import { $AWS, Table } from "functionless";
import { $AWS } from "functionless";
import { unmarshall, marshall } from "typesafe-dynamodb/lib/marshall";
import { Migration, MigrationFunction } from "../..";

Expand All @@ -12,10 +11,7 @@ export const migration: MigrationFunction = (scope, migrationName) => {
migrationName,
});

// todo: just use table from props - const table = props.table;
const table = Table.fromTable(
cdkTable.fromTableArn(scope, "TargetTable", tableArn)
);
const table = migrationDefinition.table;

// Actual migration code goes here.
// For each item in the table
Expand Down
15 changes: 11 additions & 4 deletions src/migration.ts
@@ -1,7 +1,8 @@
import { NestedStack } from "aws-cdk-lib";
import { CfnOutput, NestedStack } from "aws-cdk-lib";
import { Table as cdkTable } from "aws-cdk-lib/aws-dynamodb";
import { LogGroup, RetentionDays } from "aws-cdk-lib/aws-logs";
import { LogLevel } from "aws-cdk-lib/aws-stepfunctions";
import { camelCase } from "change-case";
import { Construct } from "constructs";
import {
ITable,
Expand Down Expand Up @@ -31,12 +32,12 @@ export class Migration<T extends object> extends NestedStack {

public readonly migrationName: string;

public stateMachine?: StepFunction<any, any>;
public stateMachineArn?: CfnOutput;

constructor(scope: Construct, id: string, props: MigrationProps) {
super(scope, id);

this.migrationName = props.migrationName;
this.migrationName = camelCase(props.migrationName.split(".")[0]);
this.table = Table.fromTable(
cdkTable.fromTableArn(this, "SubjectTable", props.tableArn)
);
Expand Down Expand Up @@ -95,7 +96,13 @@ export class Migration<T extends object> extends NestedStack {
}
);

this.stateMachine = stateMachine;
console.log(this.migrationName);

this.stateMachineArn = new CfnOutput(this, "StateMachineArn", {
exportName: `${this.migrationName}StateMachineArn`,
value: stateMachine.resource.stateMachineArn,
});

return stateMachine;
}
}
Expand Down
43 changes: 31 additions & 12 deletions src/migrations-manager.ts
@@ -1,6 +1,7 @@
import * as fs from "fs";
import * as path from "path";
import { aws_dynamodb, CustomResource } from "aws-cdk-lib";
import { aws_dynamodb, CustomResource, Fn } from "aws-cdk-lib";
import { PolicyStatement } from "aws-cdk-lib/aws-iam";
import { Provider } from "aws-cdk-lib/custom-resources";
import { Construct } from "constructs";
import { Table } from "functionless";
Expand All @@ -12,16 +13,19 @@ export type MigrationManagerProps = {
* Custom name for the DynamoDB table storing migrations
*/
tableName?: string;
/**
* Directory where migration files are stored
*/
migrationsDir: string;
};

export type MigrationStatus = "success" | "in_progress" | "failure";

export type MigrationHistoryItem = {
id: string;
status: "success" | "in_progress" | "failure";
status: MigrationStatus;
startedAt: string;
endedAt: string;
segments: number;
completedSegments?: number[];
endedAt?: string;
};

export class MigrationsManager extends Construct {
Expand Down Expand Up @@ -56,26 +60,41 @@ export class MigrationsManager extends Construct {
migrationFile
)).migration(this, migrationFile);

console.log({ migrationFile, migrationStack });

migrationStacks.push(migrationStack);
}

const migrationIdStateMachinePairs = migrationStacks.map((migration) => ({
stateMachineArn: Fn.importValue(
`${migration.migrationName}StateMachineArn`
).toString(),
migrationId: migration.migrationName,
}));

const onEventHandler = new CustomResourceMigrationsRunner(
this,
"MigrationsRunner",
//todo: For some reason migrationStacks arent' passed properly.
// consider passing just SFN ARNs and recreating them inside migrationsRunner ( need to add a cdk.output?)
{ migrationsHistoryTable, migrationFiles, migrationStacks }
{
migrationsHistoryTable,
migrationIdStateMachinePairs,
}
);

const migrationsProvider = new Provider(this, "MigrationsProvider", {
// todo: add isCompleteHandler
onEventHandler: onEventHandler.function.resource,
});

// Ensure migrations provider is ran after all nested stacks are created
migrationStacks.map((stack) =>
migrationsProvider.node.addDependency(stack)
migrationStacks.map((stack) => {
migrationsProvider.node.addDependency(stack);
});

// Allow custom resource to start execution of the migrations state machine
onEventHandler.function.resource.addToRolePolicy(
new PolicyStatement({
actions: ["states:StartExecution"],
resources: migrationIdStateMachinePairs.map((m) => m.stateMachineArn),
})
);

new CustomResource(this, "MigrationsTrigger", {
Expand Down

0 comments on commit 272fb6f

Please sign in to comment.