Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"@types/mongodb": "^4.0.7",
"@types/node": "20.11.30",
"@types/pg": "^8.11.6",
"@types/pg-connection-string": "^2.0.0",
"@types/pg-format": "^1.0.5",
"@types/uuid": "9.0.8",
"@typescript-eslint/eslint-plugin": "7.9.0",
Expand All @@ -89,6 +90,7 @@
},
"peerDependencies": {
"pg": "^8.12.0",
"pg-connection-string": "^2.6.4",
"pg-format": "^1.0.4",
"testcontainers": "^10.10.1"
},
Expand Down
7 changes: 6 additions & 1 deletion src/packages/dumbo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,19 @@
"dist"
],
"peerDependencies": {
"@types/uuid": "^9.0.8",
"@types/pg": "^8.11.6",
"@types/pg-connection-string": "^2.0.0",
"@types/pg-format": "^1.0.5",
"@types/uuid": "^9.0.8",
"pg": "^8.12.0",
"pg-connection-string": "^2.6.4",
"pg-format": "^1.0.4",
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/node": "20.11.30"
},
"dependencies": {
"@event-driven-io/dumbo": "^0.1.0"
}
}
2 changes: 1 addition & 1 deletion src/packages/dumbo/src/connections/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ export const postgresClient = (

return {
connect: () => pool.connect(),
close: () => endPool(connectionString),
close: () => endPool({ connectionString, database }),
};
};
6 changes: 6 additions & 0 deletions src/packages/dumbo/src/connections/connectionString.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pgcs from 'pg-connection-string';

export const defaultPostgreSqlDatabase = 'postgres';

export const getDatabaseNameOrDefault = (connectionString: string) =>
pgcs.parse(connectionString).database ?? defaultPostgreSqlDatabase;
1 change: 1 addition & 0 deletions src/packages/dumbo/src/connections/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './client';
export * from './connectionString';
export * from './pool';
62 changes: 54 additions & 8 deletions src/packages/dumbo/src/connections/pool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import pg from 'pg';
import {
defaultPostgreSqlDatabase,
getDatabaseNameOrDefault,
} from './connectionString';

const pools: Map<string, pg.Pool> = new Map();
const usageCounter: Map<string, number> = new Map();

export const getPool = (
connectionStringOrOptions: string | pg.PoolConfig,
Expand All @@ -15,22 +20,63 @@ export const getPool = (
? { connectionString }
: connectionStringOrOptions;

//TODO: this should include database name resolution for key
const database =
poolOptions.database ??
(poolOptions.connectionString
? getDatabaseNameOrDefault(poolOptions.connectionString)
: undefined);

const lookupKey = key(connectionString, database);

updatePoolUsageCounter(lookupKey, 1);

return (
pools.get(connectionString) ??
pools.set(connectionString, new pg.Pool(poolOptions)).get(connectionString)!
pools.get(lookupKey) ??
pools.set(lookupKey, new pg.Pool(poolOptions)).get(lookupKey)!
);
};

export const endPool = async (connectionString: string): Promise<void> => {
const pool = pools.get(connectionString);
if (pool) {
export const endPool = async ({
connectionString,
database,
force,
}: {
connectionString: string;
database?: string | undefined;
force?: boolean;
}): Promise<void> => {
database = database ?? getDatabaseNameOrDefault(connectionString);
const lookupKey = key(connectionString, database);

const pool = pools.get(lookupKey);
if (pool && (updatePoolUsageCounter(lookupKey, -1) <= 0 || force === true)) {
await onEndPool(lookupKey, pool);
}
};

export const onEndPool = async (lookupKey: string, pool: pg.Pool) => {
try {
await pool.end();
pools.delete(connectionString);
pools.delete(lookupKey);
} catch (error) {
console.log(`Error while closing the connection pool: ${lookupKey}`);
console.log(error);
}
};

export const endAllPools = () =>
Promise.all(
[...pools.keys()].map((connectionString) => endPool(connectionString)),
[...pools.entries()].map(([lookupKey, pool]) => onEndPool(lookupKey, pool)),
);

const key = (connectionString: string, database: string | undefined) =>
`${connectionString}|${database ?? defaultPostgreSqlDatabase}`;

const updatePoolUsageCounter = (lookupKey: string, by: 1 | -1): number => {
const currentCounter = usageCounter.get(lookupKey) ?? 0;
const newCounter = currentCounter + by;

usageCounter.set(lookupKey, currentCounter + by);

return newCounter;
};
10 changes: 7 additions & 3 deletions src/packages/pongo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,20 @@
"dist"
],
"peerDependencies": {
"@types/uuid": "^9.0.8",
"@event-driven-io/dumbo": "^0.1.0",
"@types/mongodb": "^4.0.7",
"@types/pg": "^8.11.6",
"@types/pg-format": "^1.0.5",
"@types/mongodb": "^4.0.7",
"@event-driven-io/dumbo": "^0.1.0",
"@types/uuid": "^9.0.8",
"pg": "^8.12.0",
"pg-format": "^1.0.4",
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/node": "20.11.30"
},
"dependencies": {
"@types/pg-connection-string": "^2.0.0",
"pg-connection-string": "^2.6.4"
}
}
3 changes: 1 addition & 2 deletions src/packages/pongo/src/e2e/compatibilityTest.e2e.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { endAllPools } from '@event-driven-io/dumbo';
import {
MongoDBContainer,
type StartedMongoDBContainer,
Expand Down Expand Up @@ -60,7 +59,7 @@ void describe('MongoDB Compatibility Tests', () => {

after(async () => {
try {
await endAllPools();
await pongoClient.close();
await postgres.stop();
} catch (error) {
console.log(error);
Expand Down
18 changes: 0 additions & 18 deletions src/packages/pongo/src/main/client.ts

This file was deleted.

9 changes: 3 additions & 6 deletions src/packages/pongo/src/main/dbClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { postgresClient } from '../postgres';
import { postgresClient, type PongoClientOptions } from '../postgres';
import type { PongoCollection } from './typing/operations';

export interface DbClient {
Expand All @@ -7,10 +7,7 @@ export interface DbClient {
collection: <T>(name: string) => PongoCollection<T>;
}

export const getDbClient = (
connectionString: string,
database?: string,
): DbClient => {
export const getDbClient = (options: PongoClientOptions): DbClient => {
// This is the place where in the future could come resolution of other database types
return postgresClient(connectionString, database);
return postgresClient(options);
};
2 changes: 1 addition & 1 deletion src/packages/pongo/src/main/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './client';
export * from './dbClient';
export * from './pongoClient';
export * from './typing';
35 changes: 35 additions & 0 deletions src/packages/pongo/src/main/pongoClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { getDatabaseNameOrDefault } from '@event-driven-io/dumbo';
import { getDbClient, type DbClient } from './dbClient';
import type { PongoClient, PongoDb } from './typing/operations';

export const pongoClient = (connectionString: string): PongoClient => {
const defaultDbName = getDatabaseNameOrDefault(connectionString);
const dbClients: Map<string, DbClient> = new Map();

const dbClient = getDbClient({ connectionString });
dbClients.set(defaultDbName, dbClient);

const pongoClient: PongoClient = {
connect: async () => {
await dbClient.connect();
return pongoClient;
},
close: async () => {
for (const db of dbClients.values()) {
await db.close();
}
},
db: (dbName?: string): PongoDb => {
if (!dbName) return dbClient;

return (
dbClients.get(dbName) ??
dbClients
.set(dbName, getDbClient({ connectionString, database: dbName }))
.get(dbName)!
);
},
};

return pongoClient;
};
23 changes: 16 additions & 7 deletions src/packages/pongo/src/postgres/client.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
import { endPool, getPool } from '@event-driven-io/dumbo';
import pg from 'pg';
import { type DbClient } from '../main';
import { postgresCollection } from './postgresCollection';

export const postgresClient = (
connectionString: string,
database?: string,
): DbClient => {
const pool = getPool({ connectionString, database });
export type PongoClientOptions = {
connectionString: string;
database?: string | undefined;
client?: pg.PoolClient;
};

export const postgresClient = (options: PongoClientOptions): DbClient => {
const { connectionString, database, client } = options;
const managesPoolLifetime = !client;
const clientOrPool = client ?? getPool({ connectionString, database });

return {
connect: () => Promise.resolve(),
close: () => endPool(connectionString),
collection: <T>(name: string) => postgresCollection<T>(name, pool),
close: () =>
managesPoolLifetime
? endPool({ connectionString, database })
: Promise.resolve(),
collection: <T>(name: string) => postgresCollection<T>(name, clientOrPool),
};
};
2 changes: 1 addition & 1 deletion src/packages/pongo/src/postgres/postgresCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { buildUpdateQuery } from './update';

export const postgresCollection = <T>(
collectionName: string,
pool: pg.Pool,
pool: pg.Pool | pg.PoolClient,
): PongoCollection<T> => {
const execute = (sql: SQL) => executeSQL(pool, sql);
const SqlFor = collectionSQLBuilder(collectionName);
Expand Down