Skip to content

Commit a8ae815

Browse files
committed
feat(cbjs): introduce hook onQueryStart and onQueryEnd
1 parent b4a7297 commit a8ae815

File tree

4 files changed

+201
-8
lines changed

4 files changed

+201
-8
lines changed

packages/cbjs/src/cluster.ts

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import {
4343
PingOptions,
4444
PingResult,
4545
} from './diagnosticstypes.js';
46+
import { CouchbaseError } from './errors.js';
4647
import { EventingFunctionManager } from './eventingfunctionmanager.js';
4748
import { QueryExecutor } from './queryexecutor.js';
4849
import { QueryIndexManager } from './queryindexmanager.js';
@@ -159,13 +160,39 @@ export type DnsConfig = {
159160
dnsSrvTimeout?: number;
160161
};
161162

163+
export type Hooks<T extends CouchbaseClusterTypes> = {
164+
/**
165+
* Any error thrown in a hook will be swallowed and this function will be called
166+
* @param err The error thrown by the hook function.
167+
*/
168+
onHookError: (err: unknown) => void;
169+
onQueryStart: (opts: {
170+
statement: string;
171+
options: QueryOptions<T, boolean>;
172+
}) => unknown;
173+
onQueryEnd: (
174+
opts:
175+
| {
176+
statement: string;
177+
options: QueryOptions<T, boolean>;
178+
result: QueryResult<any, boolean>;
179+
}
180+
| {
181+
statement: string;
182+
options: QueryOptions<T, boolean>;
183+
error: Error;
184+
},
185+
returnOfOnQueryStart: unknown
186+
) => void;
187+
};
188+
162189
/**
163190
* Specifies the options which can be specified when connecting
164191
* to a cluster.
165192
*
166193
* @category Core
167194
*/
168-
export type ConnectOptions = {
195+
export type ConnectOptions<T extends CouchbaseClusterTypes = any> = {
169196
/**
170197
* Specifies a username to use for an implicitly created IPasswordAuthenticator
171198
* used for authentication with the cluster.
@@ -207,6 +234,11 @@ export type ConnectOptions = {
207234
*/
208235
queryResultParser?: (value: string) => any;
209236

237+
/**
238+
* Specifies hook functions that allow you to perform actions before and after some Couchbase operations.
239+
*/
240+
hooks?: Partial<Hooks<T>>;
241+
210242
/**
211243
* Specifies the options for transactions.
212244
*/
@@ -253,6 +285,7 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
253285
private _conn: CppConnection;
254286
private _transcoder: Transcoder;
255287
private _queryResultParser: (value: string) => any;
288+
private _hooks?: Partial<Hooks<T>>;
256289
private _txnConfig: TransactionsConfig;
257290
private _transactions?: Transactions<T>;
258291
private readonly _openBuckets: Map<BucketName<T>, Promise<void>>;
@@ -279,6 +312,13 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
279312
return this._queryResultParser;
280313
}
281314

315+
/**
316+
@internal
317+
*/
318+
get hooks(): Partial<Hooks<T>> | undefined {
319+
return this._hooks;
320+
}
321+
282322
/**
283323
@internal
284324
*/
@@ -353,7 +393,7 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
353393
@internal
354394
@deprecated Use the static sdk-level {@link connect} method instead.
355395
*/
356-
constructor(connStr: string, options?: ConnectOptions) {
396+
constructor(connStr: string, options?: ConnectOptions<T>) {
357397
if (!options) {
358398
options = {};
359399
}
@@ -369,7 +409,7 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
369409
this._trustStorePath = options.security.trustStorePath ?? '';
370410

371411
if (options.configProfile) {
372-
connectionProfiles.applyProfile(options.configProfile, options);
412+
connectionProfiles.applyProfile(options.configProfile, options as never);
373413
}
374414
this._kvTimeout = options.timeouts.kvTimeout ?? 2500;
375415
this._kvDurableTimeout = options.timeouts.kvDurableTimeout ?? 10000;
@@ -384,6 +424,7 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
384424

385425
this._transcoder = options.transcoder ?? new DefaultTranscoder();
386426
this._queryResultParser = options.queryResultParser ?? JSON.parse;
427+
this._hooks = options.hooks;
387428

388429
if (options.transactions) {
389430
this._txnConfig = options.transactions;
@@ -434,7 +475,7 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
434475
*/
435476
static async connect<T extends CouchbaseClusterTypes = DefaultClusterTypes>(
436477
connStr: string,
437-
options?: ConnectOptions,
478+
options?: ConnectOptions<T>,
438479
callback?: NodeCallback<Cluster<T>>
439480
): Promise<Cluster<T>> {
440481
return await PromiseHelper.wrapAsync(async () => {

packages/cbjs/src/couchbase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import { NodeCallback } from './utilities.js';
3232
*/
3333
export async function connect<T extends CouchbaseClusterTypes = DefaultClusterTypes>(
3434
connStr: `couchbase://` | (string & NonNullable<unknown>),
35-
options?: ConnectOptions,
35+
options?: ConnectOptions<T>,
3636
callback?: NodeCallback<Cluster<T>>
3737
): Promise<Cluster<T>> {
3838
return await Cluster.connect<T>(connStr, options, callback);

packages/cbjs/src/queryexecutor.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,18 @@ export class QueryExecutor<T extends CouchbaseClusterTypes = CouchbaseClusterTyp
147147
const timeout = options.timeout ?? this._cluster.queryTimeout;
148148
const rowParser = options.queryResultParser ?? this._cluster.queryResultParser;
149149

150-
return QueryExecutor.execute((callback) => {
150+
let hookReturnValue: unknown;
151+
152+
try {
153+
hookReturnValue = this._cluster.hooks?.onQueryStart?.({
154+
statement: query,
155+
options,
156+
});
157+
} catch (err) {
158+
this._cluster.hooks?.onHookError?.(err);
159+
}
160+
161+
const result = QueryExecutor.execute((callback) => {
151162
this._cluster.conn.query(
152163
{
153164
statement: query,
@@ -192,5 +203,39 @@ export class QueryExecutor<T extends CouchbaseClusterTypes = CouchbaseClusterTyp
192203
callback
193204
);
194205
}, rowParser);
206+
207+
if (this._cluster.hooks?.onQueryEnd) {
208+
void result
209+
.then((r) => {
210+
try {
211+
this._cluster.hooks?.onQueryEnd?.(
212+
{
213+
statement: query,
214+
options,
215+
result: r,
216+
},
217+
hookReturnValue
218+
);
219+
} catch (err) {
220+
this._cluster.hooks?.onHookError?.(err);
221+
}
222+
})
223+
.catch((err) => {
224+
try {
225+
this._cluster.hooks?.onQueryEnd?.(
226+
{
227+
statement: query,
228+
options,
229+
error: err,
230+
},
231+
hookReturnValue
232+
);
233+
} catch (err) {
234+
this._cluster.hooks?.onHookError?.(err);
235+
}
236+
});
237+
}
238+
239+
return result;
195240
}
196241
}

tests/cbjs/tests/query.request.spec.ts

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
* limitations under the License.
1616
*/
1717
import JSONBigint from 'json-bigint';
18-
import { describe } from 'vitest';
18+
import { describe, vi } from 'vitest';
1919

2020
import { QueryMetaData, QueryProfileMode, QueryResult, QueryStatus } from '@cbjsdev/cbjs';
2121
import { ServerFeatures } from '@cbjsdev/http-client';
22-
import { getConnectionParams, invariant, keyspacePath } from '@cbjsdev/shared';
22+
import { getConnectionParams, keyspacePath } from '@cbjsdev/shared';
2323
import { createCouchbaseTest, getDefaultServerTestContext } from '@cbjsdev/vitest';
2424

2525
import { useSampleData } from '../fixtures/useSampleData.js';
@@ -239,6 +239,113 @@ describe.runIf(serverSupportsFeatures(ServerFeatures.Query))(
239239
}
240240
);
241241

242+
describe('hooks', () => {
243+
test('should call onQueryStart and onQueryEnd', async ({
244+
expect,
245+
serverTestContext,
246+
useDocumentKey,
247+
}) => {
248+
const mockOnQueryStart = vi.fn().mockReturnValue('mockStartReturn');
249+
const mockOnQueryEnd = vi.fn();
250+
251+
const params = getConnectionParams();
252+
const newConnection = await serverTestContext.newConnection(params, {
253+
hooks: {
254+
onQueryStart: mockOnQueryStart,
255+
onQueryEnd: mockOnQueryEnd,
256+
},
257+
});
258+
259+
const collection = newConnection
260+
.bucket(serverTestContext.bucket.name)
261+
.scope(serverTestContext.scope.name)
262+
.collection(serverTestContext.collection.name);
263+
264+
const docKey = useDocumentKey();
265+
await collection.insert(docKey, {
266+
title: 'cbjs',
267+
});
268+
269+
const statement = `SELECT META().cas AS cas FROM ${serverTestContext.getKeyspacePath()} USE KEYS $1`;
270+
const options = {
271+
parameters: [docKey],
272+
};
273+
const queryResult = await newConnection.query<{ cas: bigint }>(
274+
statement,
275+
options
276+
);
277+
278+
expect(mockOnQueryStart).toHaveBeenCalledWith({
279+
statement,
280+
options,
281+
});
282+
283+
expect(mockOnQueryEnd).toHaveBeenCalledWith(
284+
{
285+
statement,
286+
options,
287+
result: queryResult,
288+
},
289+
'mockStartReturn'
290+
);
291+
});
292+
293+
test('should call onQueryEnd with the error when the query throws', async ({
294+
expect,
295+
serverTestContext,
296+
}) => {
297+
expect.hasAssertions();
298+
const mockOnQueryEnd = vi.fn();
299+
300+
const params = getConnectionParams();
301+
const newConnection = await serverTestContext.newConnection(params, {
302+
hooks: {
303+
onQueryEnd: mockOnQueryEnd,
304+
},
305+
});
306+
307+
const statement = `SELECT INVALID QUERY`;
308+
309+
try {
310+
await newConnection.query(statement);
311+
} catch (err) {
312+
expect(mockOnQueryEnd).toHaveBeenCalledWith(
313+
{
314+
statement,
315+
options: {},
316+
error: err,
317+
},
318+
undefined
319+
);
320+
}
321+
});
322+
323+
test('should call onHookError when either onQueryStart or onQueryEnd throws', async ({
324+
expect,
325+
serverTestContext,
326+
}) => {
327+
const mockOnHookError = vi.fn();
328+
329+
const params = getConnectionParams();
330+
const newConnection = await serverTestContext.newConnection(params, {
331+
hooks: {
332+
onHookError: mockOnHookError,
333+
onQueryStart: () => {
334+
throw new Error('onQueryStart failed');
335+
},
336+
onQueryEnd: () => {
337+
throw new Error('onQueryEnd failed');
338+
},
339+
},
340+
});
341+
342+
const statement = `SELECT 1 as foo`;
343+
await newConnection.query(statement);
344+
345+
expect(mockOnHookError).toHaveBeenCalledTimes(2);
346+
});
347+
});
348+
242349
describe('row parser', () => {
243350
test('should use the cluster custom row parser for cluster level query', async ({
244351
expect,

0 commit comments

Comments
 (0)