Skip to content

Commit

Permalink
feat: new option saveTimestamps lets to attach sent timestamps to e…
Browse files Browse the repository at this point in the history
…vents that have been successfully sent
  • Loading branch information
arturwojnar committed Apr 5, 2024
1 parent 00d1fcb commit 45f3b9c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 19 deletions.
14 changes: 8 additions & 6 deletions packages/hermes-mongodb/jest.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ const { compilerOptions } = require('./tsconfig.json')
module.exports = {
preset: 'ts-jest/presets/default-esm',
extensionsToTreatAsEsm: ['.ts'],
globals: {
'ts-jest': {
useESM: true,
tsconfig: 'tsconfig.jest.json',
},
transform: {
'^.+\\.ts$': [
'ts-jest',
{
useESM: true,
tsconfig: 'tsconfig.jest.json',
},
],
},
transform: {},
testMatch: ['**/test/*.test.ts'],
testEnvironment: 'node',
testTimeout: 20000,
Expand Down
14 changes: 7 additions & 7 deletions packages/hermes-mongodb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
"@rollup/plugin-node-resolve": "~15.2.3",
"@rollup/plugin-typescript": "~11.1.6",
"@types/jest": "~29.5.12",
"@types/node": "~20.11.30",
"@typescript-eslint/eslint-plugin": "~7.4.0",
"@typescript-eslint/parser": "~7.4.0",
"@types/node": "~20.12.4",
"@typescript-eslint/eslint-plugin": "~7.5.0",
"@typescript-eslint/parser": "~7.5.0",
"ajv": "~8.12.0",
"eslint": "~8.57.0",
"eslint-config-prettier": "~9.1.0",
Expand All @@ -80,17 +80,17 @@
"husky": "~9.0.11",
"jest": "~29.7.0",
"lint-staged": "~15.2.2",
"mongodb-memory-server": "~9.1.7",
"mongoose": "~8.2.3",
"mongodb-memory-server": "~9.1.8",
"mongoose": "~8.3.0",
"nodemon": "~3.1.0",
"prettier": "~3.2.5",
"prettier-eslint": "~16.3.0",
"rollup": "~4.13.0",
"rollup": "~4.14.0",
"ts-essentials": "~9.4.1",
"ts-jest": "~29.1.2",
"ts-node": "~10.9.2",
"typedoc": "~0.25.12",
"typescript": "~5.4.3"
"typescript": "~5.4.4"
},
"dependencies": {
"mongodb": "~6.5.0"
Expand Down
7 changes: 6 additions & 1 deletion packages/hermes-mongodb/rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ export default [
sourcemap: true,
},
],
plugins: [json(), typescript({ tsconfig: './tsconfig.build.json' }), resolve(), commonjs()],
plugins: [
json(),
typescript({ tsconfig: './tsconfig.build.json', outputToFilesystem: false }),
resolve(),
commonjs(),
],
external: ['@arturwojnar/hermes', 'mongodb'],
onwarn(warning, warn) {
// Check the warning code
Expand Down
20 changes: 19 additions & 1 deletion packages/hermes-mongodb/src/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
CancellationPromise,
OutboxMessagesCollectionName,
addDisposeOnSigterm,
assertDate,
isNil,
swallow,
} from '@arturwojnar/hermes'
Expand All @@ -24,6 +25,16 @@ import { type ConsumerCreationParams, type OutboxConsumer, type OutboxMessageMod
export const createOutboxConsumer = <Event>(params: ConsumerCreationParams<Event>): OutboxConsumer<Event> => {
const { client, db, publish: _publish } = params
const partitionKey = params.partitionKey || 'default'
const saveTimestamps = params.saveTimestamps || false
const _now = params.now
const now =
typeof _now === 'function'
? () => {
const value = _now()
assertDate(value)
return value
}
: () => new Date()
const waitAfterFailedPublishMs = params.waitAfterFailedPublishMs || 1000
const shouldDisposeOnSigterm = isNil(params.shouldDisposeOnSigterm) ? true : !!params.shouldDisposeOnSigterm
const onDbError = params.onDbError || noop
Expand Down Expand Up @@ -97,7 +108,14 @@ export const createOutboxConsumer = <Event>(params: ConsumerCreationParams<Event
}

if (await _waitUntilEventIsSent(message.data)) {
await consumer.update(documentKey._id, resumeToken)
if (saveTimestamps) {
await db
.collection<OutboxMessageModel<Event>>(OutboxMessagesCollectionName)
.updateOne({ _id: message._id }, { $set: { sentAt: now() } })
await consumer.update(documentKey._id, resumeToken)
} else {
await consumer.update(documentKey._id, resumeToken)
}
}
}
} catch (error) {
Expand Down
24 changes: 20 additions & 4 deletions packages/hermes-mongodb/src/typings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type OutboxMessageModel<Event> = {
occurredAt: Date
data: Event
partitionKey: string
sentAt?: Date
}

type OutboxConsumerModel = {
Expand Down Expand Up @@ -106,6 +107,11 @@ type OutboxConsumer<Event> = {
*/
type ErrorCallback = (error: unknown) => void

/**
* A function returning current date.
*/
type NowFunction = () => Date

/**
* Creation parameters of `OutboxConsumer`.
*
Expand All @@ -117,14 +123,14 @@ type ErrorCallback = (error: unknown) => void
* <br /><b>The most important is to throw an error on a failed publish. Otherwise, the `OutboxConsumer` won't consider the event as published.</b>
* @param partitionKey - Name of the partition of the `OutboxConsumer`.
* @param waitAfterFailedPublishMs - Time after the `OutboxConsumer` will wait after a failed event publish.
* @param shouldDisposeOnSigterm - Indicates whether the `OutboxConsumer` should register a cleaning callback on `SIGTERM` and `SIGINT`.
* @param onFailedPublish - A callback fired on a failed publish.
* @param shouldDisposeOnSigterm - Indicates whether the `OutboxConsumer` should register a cleaning callback on `SIGTERM` and `SIGINT`.
* @param onFailedPublish - A callback fired on a failed publish.
* @param onDbError - A callback failed on an error related to the database.
* @template Event - Events handled by the `OutboxConsumer`. The type can be limited with a discrimitation union.
* @example
* const outbox = createOutboxConsumer<Event1 | Event2>({
* client,
* client.db('hospital'),
* db: client.db('hospital'),
* publish: async (event) => await eventBus.publish(event),
* })
*/
Expand All @@ -143,7 +149,13 @@ type ConsumerCreationParams<Event> = {
/**
* @defaultValue true
*/
shouldDisposeOnSigterm?: boolean // default true
shouldDisposeOnSigterm?: boolean
/**
* Use with consciously and carefully.
* When `true`, Hermes will be affecting many documents, resulting in much more I/O operations.
* @defaultValue false
*/
saveTimestamps?: boolean
/**
* @defaultValue `noop`
*/
Expand All @@ -152,6 +164,10 @@ type ConsumerCreationParams<Event> = {
* @defaultValue `noop`
*/
onDbError?: ErrorCallback
/**
* @defaultValue `() => new Date()`
*/
now?: NowFunction
}

export {
Expand Down
74 changes: 74 additions & 0 deletions packages/hermes-mongodb/test/events-with-sent-timestamps.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/* eslint-disable @typescript-eslint/no-floating-promises */

import { OutboxConsumersCollectionName, OutboxMessagesCollectionName } from '@arturwojnar/hermes'
import { expect, jest } from '@jest/globals'
import { ObjectId } from 'mongodb'
import nodeTimersPromises from 'node:timers/promises'
import { createOutboxConsumer } from '../src'
import { type MedicineAdded, type MedicineEvent } from './events'
import { mongodb } from './mongodb'

test('If option `saveTimestamps` is on, then sent events are marked with timestamps', async () => {
const publishEventStub = jest.fn<() => Promise<void>>().mockResolvedValue(undefined)

await mongodb(async (db, client, onDispose) => {
const messagesCollection = db.collection(OutboxMessagesCollectionName)
const consumersCollection = db.collection(OutboxConsumersCollectionName)
const outbox = createOutboxConsumer<MedicineEvent>({
client,
db,
publish: publishEventStub,
shouldDisposeOnSigterm: false,
saveTimestamps: true,
now: () => new Date('2024-04-03 15:00:00'),
})
const event: MedicineAdded = {
name: 'MedicineAdded',
data: {
medicineId: 'med1',
patientId: 'patient99',
},
}

const stop = await outbox.start()
onDispose(stop)

expect(await messagesCollection.find().toArray()).toEqual([])
expect(await consumersCollection.find().toArray()).toEqual([
{
_id: expect.any(ObjectId),
lastProcessedId: null,
resumeToken: null,
lastUpdatedAt: null,
createdAt: expect.any(Date),
partitionKey: 'default',
},
])

await outbox.publish(event)

await nodeTimersPromises.setTimeout(200)

const messages = await messagesCollection.find().toArray()
expect(messages).toEqual([
{
_id: expect.any(ObjectId),
occurredAt: expect.any(Date),
data: event,
partitionKey: 'default',
sentAt: new Date('2024-04-03 15:00:00'),
},
])
expect(messages[0].sentAt)
expect(await consumersCollection.find().toArray()).toEqual([
{
_id: expect.any(ObjectId),
lastProcessedId: messages[0]._id,
resumeToken: expect.anything(),
lastUpdatedAt: expect.any(Date),
createdAt: expect.any(Date),
partitionKey: 'default',
},
])
})
})

0 comments on commit 45f3b9c

Please sign in to comment.