Skip to content

Commit

Permalink
feat: ensure migrations are run in a correct order
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalWilinski committed Aug 10, 2022
1 parent 20534ea commit 821ef29
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 21 deletions.
2 changes: 2 additions & 0 deletions package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 13 additions & 9 deletions src/custom-resource-migrations-runner.ts
Expand Up @@ -7,6 +7,7 @@ import {
} from "aws-lambda";
import { Construct } from "constructs";
import { $AWS, Function, Table } from "functionless";
import sortBy from "lodash.sortby";
import { marshall } from "typesafe-dynamodb/lib/marshall";
import { MigrationHistoryItem, MigrationStatus } from "./migrations-manager";

Expand Down Expand Up @@ -50,17 +51,20 @@ export default class CustomResourceMigrationsRunner extends Construct {

console.log({ storedMigrations, migrationIdStateMachinePairs });

// todo: Ensure chronological order of migrations.
const migrationsToRun = migrationIdStateMachinePairs.filter(
(migrationStateMachinePair) =>
!(storedMigrations.Items ?? []).find(
(storedMigration) =>
storedMigration.id.S === migrationStateMachinePair.migrationId
)
const migrationsToRun = sortBy(
migrationIdStateMachinePairs.filter(
(migrationStateMachinePair) =>
!(storedMigrations.Items ?? []).find(
(storedMigration) =>
storedMigration.id.S ===
migrationStateMachinePair.migrationId
)
),
// migrationID starts with date
"migrationId"
);

console.log({ migrationsToRun });

// Run migrations sequentially
for (const migration of migrationsToRun) {
// todo: Depending on the cloudformation transition (success/rollback) we could either use Up or Down state machine
const command = new StartExecutionCommand({
Expand Down
76 changes: 64 additions & 12 deletions src/migration.ts
Expand Up @@ -12,19 +12,16 @@ import {
$AWS,
$SFN,
} from "functionless";
import { ScanOutput } from "typesafe-dynamodb/lib/scan";

export type ScanTableOptions = {
segments: number;
};
import { QueryInput, QueryOutput } from "typesafe-dynamodb/lib/query";
import { ScanOutput, ScanInput } from "typesafe-dynamodb/lib/scan";

export type MigrationProps = {
tableArn: string;
migrationName: string;
};

export type TransformFunctionType<T extends object> = (input: {
result: ScanOutput<T, any, any>;
result: ScanOutput<T, any, any> | QueryOutput<T, any, any>;
}) => Promise<any>;

export class Migration<T extends object> extends NestedStack {
Expand All @@ -43,12 +40,68 @@ export class Migration<T extends object> extends NestedStack {
);
}

public query(
transformFn: TransformFunctionType<T>,
options?: QueryInput<T, string, string, string, keyof T, any>
) {
// "this" cannot be referenced in a Function.
const table = this.table;

const transformFunction = new Function(
this,
"MigrationCallbackFunction",
transformFn
);

const stateMachine = new StepFunction(
this,
"MigrationStateMachine",
{
stateMachineName: this.migrationName,
logs: {
destination: new LogGroup(this, "MigrationLogGroup", {
retention: RetentionDays.ONE_WEEK,
}),
level: LogLevel.ALL,
},
},
async () => {
let lastEvaluatedKey;
let firstRun = true;

while (firstRun || lastEvaluatedKey) {
firstRun = false;

const result: ScanOutput<any, any, any> = await $AWS.DynamoDB.Scan({
Table: table,
...options,
ExclusiveStartKey: lastEvaluatedKey,
});

if (result.LastEvaluatedKey) {
lastEvaluatedKey = result.LastEvaluatedKey;
}

await transformFunction({ result });
}
}
);

this.stateMachineArn = new CfnOutput(this, "StateMachineArn", {
exportName: `${this.migrationName}StateMachineArn`,
value: stateMachine.resource.stateMachineArn,
});

return stateMachine;
}

// Creates a state machine scanning whole table in parallel and applying transform function to each item.
public scan(
_transformFn: TransformFunctionType<T>,
options?: ScanTableOptions
transformFn: TransformFunctionType<T>,
options?: ScanInput<T, string, string, keyof T, any>
) {
const totalSegments = options?.segments ?? 10;
// By default, use factor of 10 for parallelism
const totalSegments = options?.TotalSegments ?? 10;
const segments = Array.from({ length: totalSegments }, (_, i) => i);

// "this" cannot be referenced in a Function.
Expand All @@ -57,7 +110,7 @@ export class Migration<T extends object> extends NestedStack {
const transformFunction = new Function(
this,
"MigrationCallbackFunction",
_transformFn
transformFn
);

const stateMachine = new StepFunction(
Expand All @@ -83,6 +136,7 @@ export class Migration<T extends object> extends NestedStack {
const result: ScanOutput<any, any, any> = await $AWS.DynamoDB.Scan({
Table: table,
TotalSegments: totalSegments,
...options,
Segment: index,
ExclusiveStartKey: lastEvaluatedKey,
});
Expand All @@ -97,8 +151,6 @@ export class Migration<T extends object> extends NestedStack {
}
);

console.log(this.migrationName);

this.stateMachineArn = new CfnOutput(this, "StateMachineArn", {
exportName: `${this.migrationName}StateMachineArn`,
value: stateMachine.resource.stateMachineArn,
Expand Down

0 comments on commit 821ef29

Please sign in to comment.