Skip to content

Commit

Permalink
feat(postgres): add commitInTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
theodorton committed Feb 10, 2023
1 parent b45a6c1 commit f5c424a
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions packages/@ddes/postgres/lib/PostgresEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {AggregateCommit, EventStore, VersionConflictError} from '@ddes/core'
import {Repeater} from '@repeaterjs/repeater'
import {createHash} from 'crypto'
import {on} from 'events'
import {Pool} from 'pg'
import {Pool, PoolClient} from 'pg'
import QueryStream from 'pg-query-stream'
import {sql} from 'pg-sql'
import {PostgresListener} from './PostgresListener'
Expand Down Expand Up @@ -85,8 +85,33 @@ export class PostgresEventStore extends EventStore {
return commit
}

public async commitInTransaction<TAggregateCommit extends AggregateCommit>(
commits: TAggregateCommit[]
) {
const client = await this.pool.connect()

try {
await client.query('BEGIN')
for (const commit of commits) {
await this.commitOne(client, commit)
}
await client.query('COMMIT')
} catch (error: any) {
await client.query('ROLLBACK')
if (error.code === '23505') {
// TODO: Idenfity what commit caused the conflict
throw new VersionConflictError(commits[0])
}
throw error
} finally {
await client.release()
}

return commits
}

private async commitOne<TAggregateCommit extends AggregateCommit>(
pool: Pool,
pool: Pool | PoolClient,
commit: TAggregateCommit
) {
const {
Expand Down

0 comments on commit f5c424a

Please sign in to comment.