Skip to content

Commit

Permalink
docs: improved the Pulsar integration docs
Browse files Browse the repository at this point in the history
  • Loading branch information
arturwojnar committed Mar 21, 2024
1 parent 1076996 commit 4cacfe4
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 110 deletions.
22 changes: 19 additions & 3 deletions docs/pages/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,26 @@ You've written your service that utilizes `Hermes`🌿 and now you want to deplo

It's the issue of accessing the same resource by multiple processes or resource synchronization. One of the solutions is [a mutex (binary variation of a semaphore)](<https://en.wikipedia.org/wiki/Lock_(computer_science)#:~:text=In%20computer%20science%2C%20a%20lock,threads%20of%20execution%20at%20once>).

We can use Apache Pulsar and an Exclusive subscription.
## Mutex based on an Exclusive topic

<<< @/../examples/pulsar-mutex.ts
We can use Apache Pulsar and an Exclusive subscription to simulate a mutex behaviour.

<<< @/../examples/pulsar/pulsar-mutex.ts

## Defining some events

<<< @/../examples/events.ts

## One partition example

<<< @/../examples/pulsar-one-partition-mutex.ts
First, define a function that will continously send events based on `Hermes` instance:

<<< @/../examples/pulsar/do-publishing.ts

Secondly, we want to know whether the events are actually sent to the Pulsar:

<<< @/../examples/pulsar/do-receiving.ts

And the final wrap-up:

<<< @/../examples/pulsar/pulsar-one-partition-mutex.ts
2 changes: 2 additions & 0 deletions docs/pages/rabbitmq.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Under development 🛠️🔧🏗️👷🏼‍♂️

<<< @/../examples/rabbitmq-one-partition-semaphore.ts

<!-- The type is a simple wrapper to ensure the structure's correctness. It defines:
Expand Down
19 changes: 19 additions & 0 deletions examples/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export type DomainEvent<Name extends string, Data> = Readonly<{
name: Name
data: Data
}>
export type MedicineAssigned = DomainEvent<
'MedicineAssigned',
{
medicineId: string
patientId: string
}
>
export type MedicineFinished = DomainEvent<
'MedicineFinished',
{
medicineId: string
patientId: string
}
>
export type MedicineEvent = MedicineAssigned | MedicineFinished
107 changes: 0 additions & 107 deletions examples/pulsar-one-partition-mutex.ts

This file was deleted.

28 changes: 28 additions & 0 deletions examples/pulsar/do-publishing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { setTimeout } from 'node:timers/promises'
import { OutboxConsumer } from '../../packages/hermes-mongodb/src/typings'
import { MedicineEvent } from '../events'
import { ObjectId } from '../node_modules/mongodb/mongodb'

export const doPublishing = async (outbox: OutboxConsumer<MedicineEvent>) => {
while (true) {
const medicineId = new ObjectId().toString()
const patientId = new ObjectId().toString()

try {
await outbox.publish({
name: 'MedicineAssigned',
data: {
medicineId,
patientId,
},
})
} catch {
await setTimeout(1000)
continue
}

console.info(`Event published for medicine ${medicineId} nad patient ${patientId}.`)

await setTimeout(1000)
}
}
15 changes: 15 additions & 0 deletions examples/pulsar/do-receiving.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { setTimeout } from 'node:timers/promises'
import { Consumer } from '../node_modules/pulsar-client/index'

export const doReceiving = async (subscription: Consumer) => {
while (true) {
try {
const message = await subscription.receive()
const event = JSON.parse(message.getData().toString('utf-8'))

console.info(`Consumed event for medicine ${event.data.medicineId} and patient ${event.data.patientId}`)
} catch {
await setTimeout(1000)
}
}
}
File renamed without changes.
57 changes: 57 additions & 0 deletions examples/pulsar/pulsar-one-partition-mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { createOutboxConsumer } from '../../packages/hermes-mongodb/src/outbox'
import { addDisposeOnSigterm } from '../../packages/hermes/src/addDisposeOnSigterm'
import { swallow } from '../../packages/hermes/src/utils'
import { MedicineEvent } from '../events'
import { MongoClient } from '../node_modules/mongodb/mongodb'
import { Client } from '../node_modules/pulsar-client/index'
import { doPublishing } from './do-publishing'
import { doReceiving } from './do-receiving'
import { PulsarMutex } from './pulsar-mutex'

const MONGODB_URI = `mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true`
const PULSAR_URI = `pulsar://localhost:6650`

const start = async () => {
const pulsarClient = new Client({ serviceUrl: PULSAR_URI })
const mutex = new PulsarMutex(pulsarClient)
const producer = await pulsarClient.createProducer({ topic: `public/default/events` })
const subscription = await pulsarClient.subscribe({ topic: `public/default/events`, subscription: 'test' })

addDisposeOnSigterm(async () => {
await swallow(() => mutex.unlock())
await swallow(() => subscription.close())
await swallow(() => producer.close())
await swallow(() => pulsarClient.close())
})

await mutex.lock()

try {
const client = new MongoClient(MONGODB_URI)
const db = client.db('aid-kit')

await client.connect()

const outbox = createOutboxConsumer<MedicineEvent>({
client,
db,
publish: async (event) => {
// Normally, you should choose a corresponding topic for the given event.
await producer.send({
data: Buffer.from(JSON.stringify(event)),
})
},
})

// Hermes automatically registers the dispose on SIGTERM
await outbox.start()

doPublishing(outbox).catch(console.error)
doReceiving(subscription).catch(console.error)
} catch (error) {
console.error(error)
throw error
}
}

start()

0 comments on commit 4cacfe4

Please sign in to comment.