diff --git a/package.json b/package.json index c5deade..b23596e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "demux-postgres", - "version": "0.0.6", + "version": "0.0.7", "description": "Demux-js Action Handler implementation for Postgres databases", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/MassiveActionHandler.test.ts b/src/MassiveActionHandler.test.ts index e8d9ff2..76e0b8d 100644 --- a/src/MassiveActionHandler.test.ts +++ b/src/MassiveActionHandler.test.ts @@ -14,11 +14,6 @@ const dbName = "demuxmassivetest" const dbUser = "docker" const dbPass = "docker" -export function wait(ms: number) { - return new Promise((resolve) => { - setTimeout(resolve, ms) - }) -} jest.setTimeout(30000) @@ -62,7 +57,6 @@ describe("TestMassiveActionHandler", () => { it("populates database correctly", async () => { const [block1, isRollback] = await actionReader.nextBlock() await actionHandler.handleBlock(block1, isRollback, actionReader.isFirstBlock) - await wait(500) const groceries = await db.todo.findOne({ id: 1 }) expect(groceries).toEqual({ @@ -77,7 +71,6 @@ describe("TestMassiveActionHandler", () => { const [block2, isNotRollback] = await actionReader.nextBlock() await actionHandler.handleBlock(block2, isNotRollback, actionReader.isFirstBlock) - await wait(500) const cookies = await db.task.findOne({ name: "cookies" }) expect(cookies).toEqual({ @@ -97,7 +90,6 @@ describe("TestMassiveActionHandler", () => { const [block3, alsoNotRollback] = await actionReader.nextBlock() await actionHandler.handleBlock(block3, alsoNotRollback, actionReader.isFirstBlock) - await wait(500) const milk = await db.task.findOne({ name: "milk" }) const dippedCookies = await db.task.findOne({ name: "cookies" }) @@ -127,12 +119,10 @@ describe("TestMassiveActionHandler", () => { const [block1, isRollback1] = await actionReader.nextBlock() await actionHandler.handleBlock(block1, isRollback1, actionReader.isFirstBlock) expect(actionReader.isFirstBlock).toBe(true) - await wait(500) const [block2, isRollback2] = await actionReader.nextBlock() await actionHandler.handleBlock(block2, isRollback2, actionReader.isFirstBlock) expect(actionReader.isFirstBlock).toBe(false) - await wait(500) actionHandler.reset() const [needToSeek, seekTo] = await actionHandler.handleBlock(block1, isRollback1, true) diff --git a/src/MassiveActionHandler.ts b/src/MassiveActionHandler.ts index 064f5ae..4e12400 100644 --- a/src/MassiveActionHandler.ts +++ b/src/MassiveActionHandler.ts @@ -22,26 +22,22 @@ export class MassiveActionHandler extends AbstractActionHandler { } protected async handleWithState(handle: (state: any, context?: any) => void): Promise { - await new Promise((resolve, reject) => { - this.massiveInstance.withTransaction(async (tx: any) => { - let db - if (this.dbSchema === "public") { - db = tx - } else { - db = tx[this.dbSchema] - } - try { - await handle(db) - resolve(db) - } catch (err) { - console.error(err) - reject() - } - }, { - mode: new this.massiveInstance.pgp.txMode.TransactionMode({ - tiLevel: this.massiveInstance.pgp.txMode.isolationLevel.serializable, - }), - }) + await this.massiveInstance.withTransaction(async (tx: any) => { + let db + if (this.dbSchema === "public") { + db = tx + } else { + db = tx[this.dbSchema] + } + try { + await handle(db) + } catch (err) { + throw err // Throw error to trigger ROLLBACK + } + }, { + mode: new this.massiveInstance.pgp.txMode.TransactionMode({ + tiLevel: this.massiveInstance.pgp.txMode.isolationLevel.serializable, + }), }) }