Skip to content

Commit

Permalink
fix: stores drop method, failed attempt to build a dynamo projector
Browse files Browse the repository at this point in the history
  • Loading branch information
Roger Torres committed Sep 13, 2023
1 parent 6df41d4 commit 53200a3
Show file tree
Hide file tree
Showing 28 changed files with 1,465 additions and 924 deletions.
4 changes: 2 additions & 2 deletions libs/eventually-aws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
"build": "npx tsc --build"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.405.0",
"@aws-sdk/client-dynamodb": "^3.410.0",
"@rotorsoft/eventually": "workspace:^5",
"zod": "^3.22.2"
},
"devDependencies": {
"@rotorsoft/calculator-artifacts": "workspace:^1",
"@types/aws-lambda": "^8.10.119"
"@types/aws-lambda": "^8.10.120"
}
}
355 changes: 292 additions & 63 deletions libs/eventually-aws/src/DynamoProjectorStore.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,292 @@
import type { AggQuery, ProjectorStore, State } from "@rotorsoft/eventually";
import { dispose, log } from "@rotorsoft/eventually";

export const DynamoProjectorStore = <S extends State>(
table: string
): ProjectorStore<S> => {
const store: ProjectorStore<S> = {
name: `DynamoProjectorStore:${table}`,
dispose: () => {
// TODO await dispose resources
throw Error("Not implemented");
},

seed: (schema, indexes) => {
// TODO await seed store
console.log({ schema, indexes });
throw Error("Not implemented");
},

load: (ids) => {
// TODO await loading records by id
console.log({ ids });
throw Error("Not implemented");
},

commit: (map, watermark) => {
// TODO await steps
// - handle filtered deletes
// - handle filtered updates
// - prepare patched records (upserts and deletes)
// - connect
// - open transaction
// - apply patches
// - commit or rollback transaction
// - release connection
console.log({ map, watermark });
throw Error("Not implemented");
},

query: (query) => {
// TODO await query results
console.log({ query });
throw Error("Not implemented");
},

agg: (query: AggQuery<S>) => {
// TODO await query results
console.log({ query });
throw Error("Not implemented");
}
};

log().info(`[${process.pid}] ✨ ${store.name}`);
dispose(() => {
if (store.dispose) {
log().info(`[${process.pid}] ♻️ ${store.name}`);
return store.dispose();
}
return Promise.resolve();
});

return store;
};
/**
* No good compromise found!
*/

// import {
// AttributeValue,
// CreateTableCommand,
// DeleteTableCommand,
// DynamoDBClient,
// QueryCommand,
// TransactWriteItem,
// TransactWriteItemsCommand
// } from "@aws-sdk/client-dynamodb";
// import {
// conditions,
// dispose,
// log,
// type AggQuery,
// type Operator,
// type Projection,
// type ProjectionWhere,
// type ProjectorStore,
// type Schema,
// type State,
// Patch
// } from "@rotorsoft/eventually";
// import { config } from "./config";

// const EQUALS: Operator[] = ["eq", "lte", "gte", "in"];

// const dynamoOperators: Record<Operator, string> = {
// eq: "=",
// neq: "<>",
// lt: "<",
// gt: ">",
// lte: "<=",
// gte: ">=",
// in: "IN",
// nin: "NOT IN"
// };

// const filter = (
// where: ProjectionWhere<State>,
// patch?: Patch<State>
// ): {
// ConditionExpression: string;
// UpdateExpression?: string;
// ExpressionAttributeValues: Record<string, AttributeValue>;
// } => {
// const f = Object.entries(where).flatMap(([key, condition]) =>
// conditions(condition!).map(([operator, value]) => ({
// key,
// operator,
// value
// }))
// );
// const ExpressionAttributeValues = Object.assign(
// Object.fromEntries(f.map(({ key, value }) => [`:F_${key}`, { S: value }])),
// patch
// ? Object.fromEntries(
// Object.entries(patch).map(([k, v]) => [`:P_${k}`, { S: v }])
// )
// : {}
// );
// return {
// ConditionExpression: f
// .map(({ key, operator, value }) => {
// const operation =
// value === null
// ? EQUALS.includes(operator)
// ? "IS NULL"
// : "IS NOT NULL"
// : `${dynamoOperators[operator]} :F_${key}`;
// return `F_${key} ${operation}`;
// })
// .join(" AND "),
// UpdateExpression: patch
// ? "SET " +
// Object.keys(patch)
// .map((k) => `F_${k} = :P_${k}`)
// .join(", ")
// : undefined,
// ExpressionAttributeValues
// };
// };

// export const DynamoProjectorStore = <S extends State>(
// table: string,
// schema: Schema<Projection<S>>
// ): ProjectorStore<S> => {
// const client = new DynamoDBClient({
// region: config.aws.region,
// endpoint: config.aws.dynamo?.endpoint,
// credentials: config.aws.credentials
// });
// const name = `DynamoProjectorStore:${table}`;

// const store: ProjectorStore<S> = {
// name,
// dispose: async () => {
// client.destroy();
// return Promise.resolve();
// },

// seed: async () => {
// /**
// * Notes:
// * - Dummy partition Projector - just to get it working but not recommended
// * - Id is a regular field - secondary indexes not implemented
// */
// const response = await client.send(
// new CreateTableCommand({
// TableName: table,
// KeySchema: [{ AttributeName: "Projector", KeyType: "HASH" }],
// AttributeDefinitions: [
// { AttributeName: "Projector", AttributeType: "S" }
// ],
// ProvisionedThroughput: {
// ReadCapacityUnits: 5,
// WriteCapacityUnits: 5
// }
// })
// );
// log().info(`${name}.seed`, response);
// },

// drop: async (): Promise<void> => {
// try {
// await client.send(new DeleteTableCommand({ TableName: table }));
// } catch {
// //ignore when not found
// }
// },

// load: async (ids) => {
// /**
// * Notes:
// * - Needs Zod schema with "coerce" util
// */
// const response = await client.send(
// new QueryCommand({
// TableName: table,
// KeyConditionExpression: "Projector = :Projector",
// FilterExpression: "F_id IN (:F_ids)",
// ExpressionAttributeValues: {
// [":Projector"]: { S: "Projector" },
// [":F_ids"]: { SS: ids }
// }
// })
// );
// return (
// response.Items?.map((item) => ({
// watermark: Number.parseInt(item.Watermark.N ?? "-1"),
// state: schema.parse(
// Object.fromEntries(
// Object.entries(item).map(([k, v]) => [k.substring(2), v.S ?? v.N])
// )
// )
// })) ?? []
// );
// },

// commit: async (map, watermark) => {
// const items: TransactWriteItem[] = [];

// /**
// * Notes:
// * - Returned counters are not accurate on filtered operations
// * - Cannot commit batches due to the lack of PK (ID)
// */
// let upserted = 0,
// deleted = 0;

// // filtered deletes
// map.deletes.forEach((del) => {
// deleted++;
// const { ConditionExpression, ExpressionAttributeValues } = filter(del);
// ExpressionAttributeValues[":Watermark"] = { N: watermark.toString() };
// items.push({
// Delete: {
// TableName: table,
// Key: { Projector: { S: "Projector" } },
// ConditionExpression: ConditionExpression.concat(
// " AND Watermark < :Watermark"
// ),
// ExpressionAttributeValues
// }
// });
// });

// // filtered updates
// map.updates.forEach(({ where, ...patch }) => {
// upserted++;
// const {
// ConditionExpression,
// UpdateExpression,
// ExpressionAttributeValues
// } = filter(where, patch);
// ExpressionAttributeValues[":Watermark"] = { N: watermark.toString() };
// if (where) {
// items.push({
// Update: {
// TableName: table,
// Key: { Projector: { S: "Projector" } },
// ConditionExpression: ConditionExpression.concat(
// " AND Watermark < :Watermark"
// ),
// UpdateExpression: UpdateExpression!.concat(
// ", Watermark = :Watermark"
// ),
// ExpressionAttributeValues
// }
// });
// }
// });

// // patched records
// map.records.forEach((rec, id) => {
// // upserts when multiple keys are found in patch
// if (Object.keys(rec).length) {
// upserted++;
// const ExpressionAttributeValues = Object.fromEntries<AttributeValue>(
// Object.entries(rec).map(([k, v]) => [
// `:F_${k}`,
// { S: v.toString() }
// ])
// );
// ExpressionAttributeValues[":F_id"] = { S: id };
// ExpressionAttributeValues[":Watermark"] = { N: watermark.toString() };
// items.push({
// Update: {
// TableName: table,
// Key: { Projector: { S: "Projector" } },
// UpdateExpression:
// "SET F_id = :F_id, Watermark = :Watermark, " +
// Object.keys(rec)
// .map((k) => `F_${k} = :F_${k}`)
// .join(", "),
// ExpressionAttributeValues
// }
// });
// } else {
// deleted++;
// items.push({
// Delete: {
// TableName: table,
// Key: { Projector: { S: "Projector" } },
// ConditionExpression: "F_id = :F_id",
// ExpressionAttributeValues: { [":F_id"]: { S: id } }
// }
// });
// }
// });

// try {
// console.log(JSON.stringify(items, null, 2));
// const tx = new TransactWriteItemsCommand({
// TransactItems: items
// });
// await client.send(tx);
// return Promise.resolve({ upserted, deleted, watermark });
// } catch (error) {
// console.log(JSON.stringify(items, null, 2), error);
// log().error(error);
// }
// return Promise.resolve({ upserted: 0, deleted: 0, watermark });
// },

// query: (query) => {
// // TODO await query results
// console.log({ query });
// throw Error("Not implemented");
// },

// agg: (query: AggQuery<S>) => {
// // TODO await query results
// console.log({ query });
// throw Error("Not implemented");
// }
// };

// log().info(`[${process.pid}] ✨ ${store.name}`);
// dispose(() => {
// if (store.dispose) {
// log().info(`[${process.pid}] ♻️ ${store.name}`);
// return store.dispose();
// }
// return Promise.resolve();
// });

// return store;
// };
Loading

0 comments on commit 53200a3

Please sign in to comment.