Skip to content

Commit

Permalink
refactor: move to separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalWilinski committed Aug 8, 2022
1 parent 0f5a331 commit 19efb4b
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 90 deletions.
42 changes: 42 additions & 0 deletions src/custom-resource-migrations-runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { CloudFormationCustomResourceEvent } from "aws-lambda";
import { Construct } from "constructs";
import { $AWS, Function } from "functionless";

export type CustomResourceMigrationsRunnerProps = {
migrationFiles: string[];
};

export default class CustomResourceMigrationsRunner extends Construct {
constructor(
scope: Construct,
id: string,
props: CustomResourceMigrationsRunnerProps
) {
super(scope, id);

new Function(
scope,
`${id}-MigrationsRunner`,
async (event: CloudFormationCustomResourceEvent) => {
console.log(event);

const migrations = await $AWS.DynamoDB.Scan({
Table: migrationsHistoryTable,
});

console.log({ migrations });

const migrationsToRun = props.migrationFiles.filter(
(migrationFile) =>
!(migrations.Items ?? []).find(
(migration) => migration.id.S === migrationFile
)
);

console.log({ migrationsToRun });

// todo: Start the migrations
}
);
}
}
42 changes: 26 additions & 16 deletions src/examples/migrations/20220801-add-attribute.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
import { Table as cdkTable } from "aws-cdk-lib/aws-dynamodb";
import { Construct } from "constructs";
import { $AWS } from "functionless";
import { ScanOutput } from "typesafe-dynamodb/lib/scan";
import { Migration, MigrationProps } from "../../app";
import { $AWS, Table } from "functionless";
import { unmarshall, marshall } from "typesafe-dynamodb/lib/marshall";
import { MigrationProps, Migration } from "../../migration";

export type MigrationFunction = (
scope: Construct,
id: string,
props: MigrationProps
) => Migration<any>;

export const up: MigrationFunction = (scope, migrationName) =>
// Initialize Migration Stack
new Migration(scope, migrationName, {
tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/SubjectTable",
}).run(async (_table, result: ScanOutput<any, any, any>) => {
// Actual migration code goes here.
// Do something with each item in the table.
const tableArn =
"arn:aws:dynamodb:us-east-1:085108115628:table/TestStack-TableCD117FA1-ZVV3ZWUOWPO";

export const migration: MigrationFunction = (scope, migrationName) => {
const migrationDefinition = new Migration<any>(scope, migrationName, {
tableArn,
migrationName,
});

const table = Table.fromTable(
cdkTable.fromTableArn(scope, "SubjectTable", tableArn)
);

// Actual migration code goes here.
// For each item in the table
migrationDefinition.scan(async ({ result }) => {
for (const i of result.Items as any[]) {
// Do the following:
await $AWS.DynamoDB.PutItem({
Table: _table,
Item: {
id: {
S: `${i}_migrated`,
},
},
Table: table,
Item: marshall({ ...unmarshall(i), migratedAt: Date.now() }),
});
}
});

return migrationDefinition;
};
3 changes: 2 additions & 1 deletion src/examples/test-stack.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { App, CfnOutput, Stack } from "aws-cdk-lib";
import { AttributeType, Table } from "aws-cdk-lib/aws-dynamodb";
import { AttributeType, BillingMode, Table } from "aws-cdk-lib/aws-dynamodb";
import { MigrationsManager } from "../app";

const app = new App();
Expand All @@ -13,6 +13,7 @@ class TestStack extends Stack {
name: "id",
type: AttributeType.STRING,
},
billingMode: BillingMode.PAY_PER_REQUEST,
});

new MigrationsManager(this, "MigrationsManager", {
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./migrations-manager";
export * from "./migration";
96 changes: 96 additions & 0 deletions src/migration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { NestedStack } from "aws-cdk-lib";
import { Table as cdkTable } from "aws-cdk-lib/aws-dynamodb";
import { LogGroup, RetentionDays } from "aws-cdk-lib/aws-logs";
import { LogLevel } from "aws-cdk-lib/aws-stepfunctions";
import { Construct } from "constructs";
import {
ITable,
StepFunction,
Table,
Function,
$AWS,
$SFN,
} from "functionless";
import { ScanOutput } from "typesafe-dynamodb/lib/scan";

export type ScanTableOptions = {
segments: number;
};

export type MigrationProps = {
tableArn: string;
migrationName: string;
};

export type TransformFunctionType<T extends object> = (input: {
result: ScanOutput<T, any, any>;
}) => Promise<any>;

export class Migration<T extends object> extends NestedStack {
public readonly table: ITable<T, any, any>;

public readonly migrationName: string;

constructor(scope: Construct, id: string, props: MigrationProps) {
super(scope, id);

this.migrationName = props.migrationName;
this.table = Table.fromTable(
cdkTable.fromTableArn(this, "SubjectTable", props.tableArn)
);
}

// Creates a state machine scanning whole table in parallel and applying transform function to each item.
public scan(
_transformFn: TransformFunctionType<T>,
options?: ScanTableOptions
) {
const totalSegments = options?.segments ?? 10;
const segments = Array.from({ length: totalSegments }, (_, i) => i);

// "this" cannot be referenced in a Function.
const table = this.table;

const transformFunction = new Function(
this,
"MigrationCallbackFunction",
_transformFn
);

return new StepFunction(
this,
"MigrationStateMachine",
{
stateMachineName: this.migrationName,
logs: {
destination: new LogGroup(this, "MigrationLogGroup", {
retention: RetentionDays.ONE_WEEK,
}),
level: LogLevel.ALL,
},
},
async () => {
return $SFN.map(segments, async (_, index) => {
let lastEvaluatedKey;
let firstRun = true;

while (firstRun || lastEvaluatedKey) {
firstRun = false;

const result = await $AWS.DynamoDB.Scan({
Table: table,
TotalSegments: totalSegments,
Segment: index,
});

if (result.LastEvaluatedKey) {
lastEvaluatedKey = result.LastEvaluatedKey;
}

await transformFunction({ result });
}
});
}
);
}
}
100 changes: 27 additions & 73 deletions src/app.ts → src/migrations-manager.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,15 @@
import * as fs from "fs";
import * as path from "path";
import { aws_dynamodb, Stack } from "aws-cdk-lib";
import { Table as cdkTable } from "aws-cdk-lib/aws-dynamodb";
import { aws_dynamodb, CustomResource } from "aws-cdk-lib";
import { Provider } from "aws-cdk-lib/custom-resources";
import { CloudFormationCustomResourceEvent } from "aws-lambda";
import { Construct } from "constructs";
import {
$AWS,
$SFN,
StepFunction,
Table,
Function,
ITable,
} from "functionless";
import { ScanOutput } from "typesafe-dynamodb/lib/scan";

export type ScanTableOptions = {
segments: number;
};

export type MigrationProps = {
tableArn: string;
};
import { $AWS, Table, Function } from "functionless";

export type MigrationManagerProps = {
/**
* Custom name for the DynamoDB table storing migrations
*/
tableName?: string;
migrationsDir: string;
};
Expand All @@ -37,11 +23,6 @@ export type MigrationHistoryItem = {
completedSegments?: number[];
};

export type TransformFunctionType<T extends object> = (
_table: ITable<T, any, any>,
result: ScanOutput<any, any, any>
) => Promise<any>;

export class MigrationsManager extends Construct {
public readonly migrationsHistoryTable: Table<MigrationHistoryItem, "id">;

Expand All @@ -65,7 +46,17 @@ export class MigrationsManager extends Construct {
const migrationsDir = path.resolve(props.migrationsDir);
const migrationFiles = fs.readdirSync(migrationsDir);

console.log({ migrationFiles });
let migrationStacks = [];

for (const migrationFile of migrationFiles) {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const migrationStack = require(path.resolve(
migrationsDir,
migrationFile
));

migrationStacks.push(migrationStack.migration(this, migrationFile));
}

const onEventHandler = new Function(
this,
Expand All @@ -87,62 +78,25 @@ export class MigrationsManager extends Construct {
);

console.log({ migrationsToRun });

// todo: Start the migrations
}
);

new Provider(this, "MigrationsProvider", {
const migrationsProvider = new Provider(this, "MigrationsProvider", {
onEventHandler: onEventHandler.resource,
});
}
}

export class Migration<T extends object> extends Stack {
public readonly table: ITable<T, any, any>;

constructor(scope: Construct, id: string, props: MigrationProps) {
super(scope, id);

this.table = Table.fromTable(
cdkTable.fromTableArn(this, "SubjectTable", props.tableArn)
migrationStacks.map((stack) =>
migrationsProvider.node.addDependency(stack)
);
}

public run(
transformFn: TransformFunctionType<T>,
options?: ScanTableOptions
) {
const totalSegments = options?.segments ?? 10;
const segments = Array.from({ length: totalSegments }, (_, i) => i);

// todo: add migration entry "in_progress"

new StepFunction(this, "MigrationStepFunction", {}, async () => {
return $SFN.map(segments, async (_, index) => {
let lastEvaluatedKey;
let firstRun = true;

while (firstRun || lastEvaluatedKey) {
firstRun = false;

const result = await $AWS.DynamoDB.Scan({
Table: this.table,
TotalSegments: totalSegments,
Segment: index,
});

result.LastEvaluatedKey = result.LastEvaluatedKey;

new Function(
this,
"MigrationCallbackFunction",
await transformFn(this.table, result)
);
}

// todo: add migration entry "completed" for some segment?
});
new CustomResource(this, "MigrationsTrigger", {
serviceToken: migrationsProvider.serviceToken,
properties: {
// Force re-running the migrations every time the stack is updated
timestamp: Date.now(),
},
});

return this;
}
}

0 comments on commit 19efb4b

Please sign in to comment.