Skip to content

Commit f17bfaa

Browse files
committed
Finished the first draft of the new connection management, made code compiling
1 parent d872d8a commit f17bfaa

22 files changed

+757
-458
lines changed

package-lock.json

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@event-driven-io/pongo-core",
3-
"version": "0.7.0",
3+
"version": "0.8.0",
44
"description": "Pongo - Mongo with strong consistency on top of Postgres",
55
"type": "module",
66
"engines": {

src/packages/dumbo/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@event-driven-io/dumbo",
3-
"version": "0.5.0",
3+
"version": "0.6.0",
44
"description": "Dumbo - tools for dealing with PostgreSQL",
55
"type": "module",
66
"scripts": {
Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
1-
export type Transaction<
2-
ConnectorType extends string = string,
3-
DbClient = unknown,
4-
> = {
5-
type: ConnectorType;
6-
client: Promise<DbClient>;
7-
begin: () => Promise<void>;
8-
commit: () => Promise<void>;
9-
rollback: () => Promise<void>;
10-
};
1+
import type { WithSQLExecutor } from './execute';
2+
import type { TransactionFactory } from './transaction';
113

124
export type Connection<
135
ConnectorType extends string = string,
146
DbClient = unknown,
157
> = {
168
type: ConnectorType;
17-
open: () => Promise<DbClient>;
9+
connect: () => Promise<DbClient>;
1810
close: () => Promise<void>;
19-
20-
beginTransaction: () => Promise<Transaction<ConnectorType, DbClient>>;
21-
};
11+
} & WithSQLExecutor &
12+
TransactionFactory<ConnectorType>;
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { type SQL } from '../sql';
2+
import type { Connection } from './connection';
3+
4+
export interface QueryResultRow {
5+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
6+
[column: string]: any;
7+
}
8+
9+
export type QueryResult<Result extends QueryResultRow = QueryResultRow> = {
10+
rowCount: number | null;
11+
rows: Result[];
12+
};
13+
14+
export type SQLExecutor<
15+
ConnectorType extends string = string,
16+
DbClient = unknown,
17+
> = {
18+
type: ConnectorType;
19+
query<Result extends QueryResultRow = QueryResultRow>(
20+
client: DbClient,
21+
sql: SQL,
22+
): Promise<QueryResult<Result>>;
23+
};
24+
25+
export type WithSQLExecutor = {
26+
execute: {
27+
query<Result extends QueryResultRow = QueryResultRow>(
28+
sql: SQL,
29+
): Promise<QueryResult<Result>>;
30+
};
31+
};
32+
33+
export const withSqlExecutor = <
34+
DbClient = unknown,
35+
Executor extends SQLExecutor = SQLExecutor,
36+
>(
37+
sqlExecutor: Executor,
38+
options: {
39+
connect: () => Promise<DbClient>;
40+
close?: (client: DbClient, error?: unknown) => Promise<void>;
41+
},
42+
): WithSQLExecutor => ({
43+
execute: {
44+
query: async <Result extends QueryResultRow = QueryResultRow>(sql: SQL) => {
45+
const { connect, close } = options;
46+
const client = await connect();
47+
48+
try {
49+
const result = await sqlExecutor.query<Result>(client, sql);
50+
if (close) await close(client);
51+
return result;
52+
} catch (error) {
53+
if (close) await close(client, error);
54+
55+
throw error;
56+
}
57+
},
58+
},
59+
});
60+
61+
export const queryWithNewConnection = async <
62+
ConnectionType extends Connection,
63+
Result extends QueryResultRow = QueryResultRow,
64+
>(
65+
connectionFactory: {
66+
open: () => Promise<ConnectionType>;
67+
},
68+
sql: SQL,
69+
) => {
70+
const { open } = connectionFactory;
71+
const connection = await open();
72+
73+
try {
74+
return await connection.execute.query<Result>(sql);
75+
} finally {
76+
await connection.close();
77+
}
78+
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
export * from './connection';
22
export * from './connectionString';
3+
export * from './execute';
34
export * from './pg';
45
export * from './pool';
6+
export * from './transaction';

src/packages/dumbo/src/connections/pg/connection.int.spec.ts

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ import {
44
} from '@testcontainers/postgresql';
55
import { after, before, describe, it } from 'node:test';
66
import pg from 'pg';
7-
import { pgConnection } from '.';
8-
import { executeSQL } from '../../execute';
7+
import { nodePostgresPool } from '.';
98
import { rawSql } from '../../sql';
10-
import { endPool, getPool } from '../pool';
9+
import { endPool, getPool } from './pool';
1110

1211
void describe('PostgreSQL connection', () => {
1312
let postgres: StartedPostgreSqlContainer;
@@ -24,68 +23,63 @@ void describe('PostgreSQL connection', () => {
2423

2524
void describe('executeSQL', () => {
2625
void it('connects using pool', async () => {
27-
const connection = pgConnection({ connectionString });
26+
const pool = nodePostgresPool({ connectionString });
27+
const connection = await pool.open();
2828

2929
try {
30-
await executeSQL(connection.pool, rawSql('SELECT 1'));
30+
await connection.execute.query(rawSql('SELECT 1'));
3131
} catch (error) {
3232
console.log(error);
3333
} finally {
3434
await connection.close();
35-
}
36-
});
37-
38-
void it('connects using connected pool client', async () => {
39-
const connection = pgConnection({ connectionString });
40-
const poolClient = await connection.open();
41-
42-
try {
43-
await executeSQL(poolClient, rawSql('SELECT 1'));
44-
} finally {
45-
await connection.close();
35+
await pool.close();
4636
}
4737
});
4838

4939
void it('connects using existing pool', async () => {
50-
const pool = getPool(connectionString);
51-
const connection = pgConnection({ connectionString, pool });
40+
const nativePool = getPool(connectionString);
41+
const pool = nodePostgresPool({ connectionString, pool: nativePool });
42+
const connection = await pool.open();
5243

5344
try {
54-
await executeSQL(pool, rawSql('SELECT 1'));
45+
await connection.execute.query(rawSql('SELECT 1'));
5546
} finally {
5647
await connection.close();
48+
await pool.close();
5749
await endPool({ connectionString });
5850
}
5951
});
6052

6153
void it('connects using client', async () => {
62-
const connection = pgConnection({
54+
const pool = nodePostgresPool({
6355
connectionString,
6456
type: 'client',
6557
});
66-
const client = await connection.open();
58+
const connection = await pool.open();
6759

6860
try {
69-
await executeSQL(client, rawSql('SELECT 1'));
61+
await connection.execute.query(rawSql('SELECT 1'));
7062
} finally {
7163
await connection.close();
64+
await pool.close();
7265
}
7366
});
7467

7568
void it('connects using connected client', async () => {
7669
const existingClient = new pg.Client({ connectionString });
7770
await existingClient.connect();
7871

79-
const connection = pgConnection({
72+
const pool = nodePostgresPool({
8073
connectionString,
8174
client: existingClient,
8275
});
83-
const client = await connection.open();
76+
const connection = await pool.open();
8477

8578
try {
86-
await executeSQL(client, rawSql('SELECT 1'));
79+
await connection.execute.query(rawSql('SELECT 1'));
8780
} finally {
8881
await connection.close();
82+
await pool.close();
8983
await existingClient.end();
9084
}
9185
});

0 commit comments

Comments
 (0)