Skip to content

Commit

Permalink
feat: add stream method
Browse files Browse the repository at this point in the history
  • Loading branch information
gajus committed Mar 1, 2019
1 parent 7067ca3 commit fcc3dd1
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 17 deletions.
2 changes: 2 additions & 0 deletions .README/INTERCEPTORS.md
Expand Up @@ -78,6 +78,8 @@ pool.connect();
Use `afterQuery` to modify the query result.
Note: When query is executed using `stream`, then `afterQuery` is called with empty result set and does not affect the query result.
#### `beforeQueryExecution`
This function can optionally return a direct result of the query which will cause the actual query never to be executed.
Expand Down
40 changes: 40 additions & 0 deletions .README/QUERY_METHODS.md
Expand Up @@ -145,6 +145,46 @@ const foo = await connection.oneFirst(sql`SELECT foo`);

API and the result shape are equivalent to [`pg#query`](https://github.com/brianc/node-postgres).

Example:

```js
await connection.query(sql`SELECT foo`);

// {
// command: 'SELECT',
// fields: [],
// notices: [],
// oid: null,
// rowAsArray: false,
// rowCount: 1,
// rows: [
// {
// foo: 'bar'
// }
// ]
// }

```

### `stream`

Streams query results.

Example:

```js
await connection.stream(sql`SELECT foo`, (stream) => {
stream.on('data', (row) => {
// {
// foo: 'bar'
// }
});
});

```

Note: Implemneted using [`pg-query-stream`](https://github.com/brianc/node-pg-query-stream).

### `transaction`

`transaction` method is used wrap execution of queries in `START TRANSACTION` and `COMMIT` or `ROLLBACK`. `COMMIT` is called if the transaction handler returns a promise that resolves; `ROLLBACK` is called otherwise.
Expand Down
4 changes: 3 additions & 1 deletion package.json
Expand Up @@ -16,10 +16,12 @@
"iso8601-duration": "^1.1.7",
"pg": "^7.8.1",
"pg-connection-string": "^2.0.0",
"pg-query-stream": "^2.0.0",
"pg-types": "^2.0.0",
"postgres-interval": "^1.2.0",
"roarr": "^2.13.0",
"serialize-error": "^3.0.0",
"through2": "^3.0.1",
"ulid": "^2.3.0"
},
"description": "A PostgreSQL client with strict types, detail logging and assertions.",
Expand All @@ -35,7 +37,7 @@
"coveralls": "^3.0.3",
"eslint": "^5.14.1",
"eslint-config-canonical": "^16.2.0",
"flow-bin": "^0.93.0",
"flow-bin": "^0.94.0",
"flow-copy-source": "^2.0.3",
"gitdown": "^2.5.7",
"husky": "^1.3.1",
Expand Down
22 changes: 22 additions & 0 deletions src/binders/bindPool.js
Expand Up @@ -5,6 +5,7 @@ import type {
DatabasePoolType,
InternalDatabasePoolType,
LoggerType,
StreamHandlerType,
TaggedTemplateLiteralInvocationType
} from '../types';
import {
Expand Down Expand Up @@ -65,6 +66,27 @@ export default (
one: mapConnection('one'),
oneFirst: mapConnection('oneFirst'),
query: mapConnection('query'),
stream: async (query: TaggedTemplateLiteralInvocationType, streamHandler: StreamHandlerType) => {
if (typeof query === 'string') {
throw new TypeError('Query must be constructed using `sql` tagged template literal.');
}

await createConnection(
parentLog,
pool,
clientConfiguration,
'IMPLICIT_QUERY',
(connectionLog, connection, boundConnection) => {
return boundConnection.stream(query, streamHandler);
},
(newPool) => {
return newPool.stream(query, streamHandler);
},
query
);

return null;
},
transaction: async (transactionHandler) => {
return createConnection(
parentLog,
Expand Down
2 changes: 2 additions & 0 deletions src/binders/bindPoolConnection.js
Expand Up @@ -20,6 +20,7 @@ import {
one,
oneFirst,
query,
stream,
transaction
} from '../connectionMethods';

Expand All @@ -38,6 +39,7 @@ export default (
one: mapTaggedTemplateLiteralInvocation(one.bind(null, parentLog, connection, clientConfiguration)),
oneFirst: mapTaggedTemplateLiteralInvocation(oneFirst.bind(null, parentLog, connection, clientConfiguration)),
query: mapTaggedTemplateLiteralInvocation(query.bind(null, parentLog, connection, clientConfiguration)),
stream: mapTaggedTemplateLiteralInvocation(stream.bind(null, parentLog, connection, clientConfiguration)),
transaction: async (handler: TransactionFunctionType) => {
return transaction(parentLog, connection, clientConfiguration, handler);
}
Expand Down
1 change: 1 addition & 0 deletions src/connectionMethods/index.js
Expand Up @@ -10,4 +10,5 @@ export {default as nestedTransaction} from './nestedTransaction';
export {default as one} from './one';
export {default as oneFirst} from './oneFirst';
export {default as query} from './query';
export {default as stream} from './stream';
export {default as transaction} from './transaction';
63 changes: 63 additions & 0 deletions src/connectionMethods/stream.js
@@ -0,0 +1,63 @@
// @flow

import QueryStream from 'pg-query-stream';
import through from 'through2';
import {
executeQuery
} from '../routines';
import type {
InternalStreamFunctionType
} from '../types';

const stream: InternalStreamFunctionType = async (connectionLogger, connection, clientConfiguration, rawSql, values, streamHandler) => {
return executeQuery(
connectionLogger,
connection,
clientConfiguration,
rawSql,
values,
undefined,
(finalConnection, finsalSql, finalValues, executionContext, actualQuery) => {
const query = new QueryStream(finsalSql, finalValues);

const queryStream = finalConnection.query(query);

const rowTransformers = [];

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.transformRow) {
rowTransformers.push(interceptor.transformRow);
}
}

return new Promise((resolve, reject) => {
queryStream.on('error', (error) => {
reject(error);
});

queryStream.on('end', () => {
resolve({});
});

streamHandler(
queryStream.pipe(through.obj(function (row, enc, callback) {
let finalRow = row;

if (rowTransformers.length) {
for (const rowTransformer of rowTransformers) {
finalRow = rowTransformer(executionContext, actualQuery, finalRow);
}
}

// eslint-disable-next-line fp/no-this, babel/no-invalid-this
this.push(finalRow);

callback();
}))
);
});
}
);
};

export default stream;
6 changes: 4 additions & 2 deletions src/factories/createConnection.js
@@ -1,6 +1,7 @@
// @flow

import type {
MaybePromiseType,
ClientConfigurationType,
ConnectionTypeType,
DatabasePoolType,
Expand All @@ -25,7 +26,8 @@ type ConnectionHandlerType = (
connection: InternalDatabaseConnectionType,
boundConnection: DatabasePoolConnectionType,
clientConfiguration: ClientConfigurationType
) => Promise<*>;
) => MaybePromiseType<*>;

type PoolHandlerType = (pool: DatabasePoolType) => Promise<*>;

const createConnection = async (
Expand All @@ -36,7 +38,7 @@ const createConnection = async (
connectionHandler: ConnectionHandlerType,
poolHandler: PoolHandlerType,
query?: TaggedTemplateLiteralInvocationType | null = null
): Promise<*> => {
) => {
for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.beforePoolConnection) {
const maybeNewPool = await interceptor.beforePoolConnection({
Expand Down
38 changes: 29 additions & 9 deletions src/routines/executeQuery.js
Expand Up @@ -17,19 +17,32 @@ import {
UniqueIntegrityConstraintViolationError
} from '../errors';
import type {
QueryContextType
ClientConfigurationType,
InternalDatabaseConnectionType,
LoggerType,
PrimitiveValueExpressionType,
QueryContextType,
QueryIdType,
QueryResultRowType,
QueryType
} from '../types';

type ExecutionRoutineType = (connection: InternalDatabaseConnectionType, sql: string, values: $ReadOnlyArray<PrimitiveValueExpressionType>) => Promise<*>;
type ExecutionRoutineType = (
connection: InternalDatabaseConnectionType,
sql: string,
values: $ReadOnlyArray<PrimitiveValueExpressionType>,
queryContext: QueryContextType,
query: QueryType
) => Promise<*>;

// eslint-disable-next-line complexity
export default async (
connectionLogger: LoggerType,
connection: InternalDatabaseConnectionType,
clientConfiguration: ClientConfigurationType,
rawSql: string,
values: $ReadOnlyArray<PrimitiveValueExpressionType>,
inheritedQueryId: QueryIdType,
values: $ReadOnlyArray<PrimitiveValueExpressionType> = [],
inheritedQueryId?: QueryIdType,
executionRoutine: ExecutionRoutineType
) => {
if (connection.connection.slonik.terminated) {
Expand Down Expand Up @@ -81,7 +94,7 @@ export default async (

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.transformQuery) {
actualQuery = await interceptor.transformQuery(executionContext, actualQuery);
actualQuery = interceptor.transformQuery(executionContext, actualQuery);
}
}

Expand All @@ -106,7 +119,7 @@ export default async (
connection.on('notice', noticeListener);

try {
result = await executionRoutine(connection, actualQuery.sql, actualQuery.values);
result = await executionRoutine(connection, actualQuery.sql, actualQuery.values, executionContext, actualQuery);
} catch (error) {
// 'Connection terminated' refers to node-postgres error.
// @see https://github.com/brianc/node-postgres/blob/eb076db5d47a29c19d3212feac26cd7b6d257a95/lib/client.js#L199
Expand Down Expand Up @@ -150,15 +163,22 @@ export default async (

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.afterQueryExecution) {
result = interceptor.afterQueryExecution(executionContext, actualQuery, result);
result = await interceptor.afterQueryExecution(executionContext, actualQuery, result);
}
}

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.transformRow) {
result.rows = result.rows.map((row) => {
return interceptor.transformRow(executionContext, actualQuery, row);
const transformRow = interceptor.transformRow;

const rows: $ReadOnlyArray<QueryResultRowType> = result.rows.map((row) => {
return transformRow(executionContext, actualQuery, row);
});

result = {
...result,
rows
};
}
}

Expand Down
32 changes: 27 additions & 5 deletions src/types.js
Expand Up @@ -2,6 +2,9 @@

/* eslint-disable no-use-before-define, import/exports-last, flowtype/require-types-at-top */

import type {
Readable
} from 'stream';
import type {
LoggerType
} from 'roarr';
Expand All @@ -22,7 +25,9 @@ export type {

export opaque type QueryIdType = string;

type MaybePromiseType<T> = T | Promise<T>;
export type MaybePromiseType<T> = T | Promise<T>;

export type StreamHandlerType = (stream: Readable) => void;

export type ConnectionTypeType = 'EXPLICIT' | 'IMPLICIT_QUERY' | 'IMPLICIT_TRANSACTION';

Expand Down Expand Up @@ -111,6 +116,8 @@ export type TransactionFunctionType = (connection: DatabaseTransactionConnection

export type DatabasePoolConnectionType = {|
...CommonQueryMethodsType,

+stream: (sql: TaggedTemplateLiteralInvocationType, streamHandler: StreamHandlerType) => Promise<null>,
+transaction: (handler: TransactionFunctionType) => Promise<*>
|};

Expand All @@ -119,6 +126,9 @@ export type ConnectionRoutineType = (connection: DatabasePoolConnectionType) =>
export type DatabasePoolType = {|
...CommonQueryMethodsType,
+connect: (connectionRoutine: ConnectionRoutineType) => Promise<*>,

// $FlowFixMe
+stream: (sql: TaggedTemplateLiteralInvocationType, streamHandler: StreamHandlerType) => Promise<null>,
+transaction: (handler: TransactionFunctionType) => Promise<*>
|};

Expand All @@ -132,15 +142,15 @@ export type DatabaseConnectionType =
...DatabasePoolType
}>;

type QueryResultRowColumnType = string | number;
type QueryResultRowColumnType = string | number | null;

export type QueryResultRowType = {
[key: string]: QueryResultRowColumnType
+[key: string]: QueryResultRowColumnType
};

export type QueryType = {|
+sql: string,
+values?: $ReadOnlyArray<PrimitiveValueExpressionType>
+values: $ReadOnlyArray<PrimitiveValueExpressionType>
|};

export type SqlFragmentType = {|
Expand Down Expand Up @@ -313,7 +323,7 @@ export type InternalQueryMethodType<R> = (
connection: InternalDatabaseConnectionType,
clientConfiguration: ClientConfigurationType,
sql: string,
values?: $ReadOnlyArray<PrimitiveValueExpressionType>,
values: $ReadOnlyArray<PrimitiveValueExpressionType>,
uid?: QueryIdType
) => Promise<R>;

Expand All @@ -327,6 +337,18 @@ export type InternalQueryMaybeOneFunctionType = InternalQueryMethodType<QueryRes
export type InternalQueryOneFirstFunctionType = InternalQueryMethodType<QueryResultRowColumnType>;
export type InternalQueryOneFunctionType = InternalQueryMethodType<QueryResultRowType>;

export type InternalStreamFunctionType = (
log: LoggerType,
connection: InternalDatabaseConnectionType,
clientConfiguration: ClientConfigurationType,
sql: string,
values: $ReadOnlyArray<PrimitiveValueExpressionType>,
streamHandler: StreamHandlerType,
uid?: QueryIdType

// eslint-disable-next-line flowtype/no-weak-types
) => Promise<Object>;

export type InternalTransactionFunctionType = (
log: LoggerType,
connection: InternalDatabaseConnectionType,
Expand Down

0 comments on commit fcc3dd1

Please sign in to comment.