Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/packages/dumbo/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@event-driven-io/dumbo",
"version": "0.4.1",
"version": "0.5.0",
"description": "Dumbo - tools for dealing with PostgreSQL",
"type": "module",
"scripts": {
Expand Down
59 changes: 59 additions & 0 deletions src/packages/dumbo/src/execute/connections.e2e.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {
PostgreSqlContainer,
type StartedPostgreSqlContainer,
} from '@testcontainers/postgresql';
import { after, before, describe, it } from 'node:test';
import pg from 'pg';
import { executeSQL } from '.';
import { rawSql } from '../sql';

void describe('PostgreSQL connection', () => {
let postgres: StartedPostgreSqlContainer;
let connectionString: string;

before(async () => {
postgres = await new PostgreSqlContainer().start();
connectionString = postgres.getConnectionUri();
});

after(async () => {
await postgres.stop();
});

void describe('executeSQL', () => {
void it('connects using pool', async () => {
const pool = new pg.Pool({ connectionString });

try {
await executeSQL(pool, rawSql('SELECT 1'));
} catch (error) {
console.log(error);
} finally {
await pool.end();
}
});

void it('connects using connected pool client', async () => {
const pool = new pg.Pool({ connectionString });
const poolClient = await pool.connect();

try {
await executeSQL(poolClient, rawSql('SELECT 1'));
} finally {
poolClient.release();
await pool.end();
}
});

void it('connects using connected client', async () => {
const client = new pg.Client({ connectionString });
await client.connect();

try {
await executeSQL(client, rawSql('SELECT 1'));
} finally {
await client.end();
}
});
});
});
45 changes: 31 additions & 14 deletions src/packages/dumbo/src/execute/index.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,44 @@
import type pg from 'pg';
import pg from 'pg';
import type { SQL } from '../sql';

export const isPgPool = (
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
): poolOrClient is pg.Pool => {
return poolOrClient instanceof pg.Pool;
};

export const isPgClient = (
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
): poolOrClient is pg.Client => poolOrClient instanceof pg.Client;

export const isPgPoolClient = (
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
): poolOrClient is pg.PoolClient =>
'release' in poolOrClient && typeof poolOrClient.release === 'function';

export const execute = async <Result = void>(
pool: pg.Pool,
handle: (client: pg.PoolClient) => Promise<Result>,
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
handle: (client: pg.PoolClient | pg.Client) => Promise<Result>,
) => {
const client = await pool.connect();
const client = isPgPool(poolOrClient)
? await poolOrClient.connect()
: poolOrClient;

try {
return await handle(client);
} finally {
client.release();
// release only if client wasn't injected externally
if (isPgPool(poolOrClient) && isPgPoolClient(client)) client.release();
}
};

export const executeInTransaction = async <Result = void>(
pool: pg.Pool,
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
handle: (
client: pg.PoolClient,
client: pg.PoolClient | pg.Client,
) => Promise<{ success: boolean; result: Result }>,
): Promise<Result> =>
execute(pool, async (client) => {
execute(poolOrClient, async (client) => {
try {
await client.query('BEGIN');

Expand All @@ -38,17 +57,15 @@ export const executeInTransaction = async <Result = void>(
export const executeSQL = async <
Result extends pg.QueryResultRow = pg.QueryResultRow,
>(
poolOrClient: pg.Pool | pg.PoolClient,
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
sql: SQL,
): Promise<pg.QueryResult<Result>> =>
'totalCount' in poolOrClient
? execute(poolOrClient, (client) => client.query<Result>(sql))
: poolOrClient.query<Result>(sql);
execute(poolOrClient, (client) => client.query<Result>(sql));

export const executeSQLInTransaction = async <
Result extends pg.QueryResultRow = pg.QueryResultRow,
>(
pool: pg.Pool,
pool: pg.Pool | pg.PoolClient | pg.Client,
sql: SQL,
) => {
console.log(sql);
Expand All @@ -61,7 +78,7 @@ export const executeSQLInTransaction = async <
export const executeSQLBatchInTransaction = async <
Result extends pg.QueryResultRow = pg.QueryResultRow,
>(
pool: pg.Pool,
pool: pg.Pool | pg.PoolClient | pg.Client,
...sqls: SQL[]
) =>
executeInTransaction(pool, async (client) => {
Expand Down
4 changes: 2 additions & 2 deletions src/packages/pongo/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@event-driven-io/pongo",
"version": "0.7.0",
"version": "0.8.0",
"description": "Pongo - Mongo with strong consistency on top of Postgres",
"type": "module",
"scripts": {
Expand Down Expand Up @@ -47,7 +47,7 @@
"dist"
],
"peerDependencies": {
"@event-driven-io/dumbo": "^0.4.1",
"@event-driven-io/dumbo": "^0.5.0",
"@types/mongodb": "^4.0.7",
"@types/pg": "^8.11.6",
"@types/pg-format": "^1.0.5",
Expand Down
85 changes: 85 additions & 0 deletions src/packages/pongo/src/main/pongoClient.connections.e2e.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { isPgPool } from '@event-driven-io/dumbo';
import {
PostgreSqlContainer,
type StartedPostgreSqlContainer,
} from '@testcontainers/postgresql';
import { randomUUID } from 'node:crypto';
import { after, before, describe, it } from 'node:test';
import pg from 'pg';
import { pongoClient } from './pongoClient';

type User = {
_id?: string;
name: string;
};

void describe('Pongo collection', () => {
let postgres: StartedPostgreSqlContainer;
let connectionString: string;

before(async () => {
postgres = await new PostgreSqlContainer().start();
connectionString = postgres.getConnectionUri();
});

after(async () => {
await postgres.stop();
});

const insertDocumentUsingPongo = async (
poolOrClient: pg.Pool | pg.PoolClient | pg.Client,
) => {
const pongo = pongoClient(
connectionString,
isPgPool(poolOrClient)
? undefined
: {
client: poolOrClient,
},
);

try {
const pongoCollection = pongo.db().collection<User>('connections');
await pongoCollection.insertOne({ name: randomUUID() });
} finally {
await pongo.close();
}
};

void describe('Pool', () => {
void it('connects using pool', async () => {
const pool = new pg.Pool({ connectionString });

try {
await insertDocumentUsingPongo(pool);
} catch (error) {
console.log(error);
} finally {
await pool.end();
}
});

void it('connects using connected pool client', async () => {
const pool = new pg.Pool({ connectionString });
const poolClient = await pool.connect();

try {
await insertDocumentUsingPongo(poolClient);
} finally {
poolClient.release();
await pool.end();
}
});

void it('connects using connected client', async () => {
const client = new pg.Client({ connectionString });
await client.connect();

try {
await insertDocumentUsingPongo(client);
} finally {
await client.end();
}
});
});
});
2 changes: 1 addition & 1 deletion src/packages/pongo/src/main/pongoClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { PongoClient, PongoDb } from './typing/operations';

export const pongoClient = (
connectionString: string,
options: { client?: pg.PoolClient } = {},
options: { client?: pg.PoolClient | pg.Client } = {},
): PongoClient => {
const defaultDbName = getDatabaseNameOrDefault(connectionString);
const dbClients: Map<string, DbClient> = new Map();
Expand Down
2 changes: 1 addition & 1 deletion src/packages/pongo/src/mongo/mongoClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class MongoClient {

constructor(
connectionString: string,
options: { client?: pg.PoolClient } = {},
options: { client?: pg.PoolClient | pg.Client } = {},
) {
this.pongoClient = pongoClient(connectionString, options);
}
Expand Down
2 changes: 1 addition & 1 deletion src/packages/pongo/src/postgres/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { postgresCollection } from './postgresCollection';
export type PongoClientOptions = {
connectionString: string;
dbName?: string | undefined;
client?: pg.PoolClient | undefined;
client?: pg.PoolClient | pg.Client | undefined;
};

export const postgresClient = (options: PongoClientOptions): DbClient => {
Expand Down
2 changes: 1 addition & 1 deletion src/packages/pongo/src/postgres/postgresCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const postgresCollection = <T extends PongoDocument>(
{
dbName,
poolOrClient: clientOrPool,
}: { dbName: string; poolOrClient: pg.Pool | pg.PoolClient },
}: { dbName: string; poolOrClient: pg.Pool | pg.PoolClient | pg.Client },
): PongoCollection<T> => {
const execute = <T extends pg.QueryResultRow = pg.QueryResultRow>(sql: SQL) =>
executeSQL<T>(clientOrPool, sql);
Expand Down