support the nats message broker for CAP NodeJS runtime.
install dependency
npm i -S cds-nats
and ref the Process Environment document to configure the
Nats Connection-Options
CDS_REQUIRES_NATS_SERVERS=127.0.0.1:4222
Use
Nats
as a message broker
Different from than the default behavior of CAP Messaging Service, cds-nats
will prefer to listen event on Queue Group
instead of general Subscription
, because in micro-service architecture, in most case we need the Queue/Consumer Group
each message only be consumed by single service instance.
graph LR
Producer_1[Application A, Instance 1] -->|Message 1| Nats
Producer_2[Application A, Instance 2] -->|Message 2| Nats
Producer_3[Application B, Instance 1] -->|Message 3| Nats
Nats(Nats: Queue XXXX)
Nats -->|Message 3| Consumer_1[Application C, Instance 1]
Nats -->|Message 1| Consumer_2[Application C, Instance 2]
Nats -->|Message 2| Consumer_3[Application C, Instance 3]
Also, if you want to apply the Publisher/Subscriber - Broadcast
pattern, just add the @topic
annotation to the event (the @queue
annotation is also supported).
graph LR
Publisher_1[Application A, Instance 1] -->|Message 1| Nats
Publisher_2[Application A, Instance 2] -->|Message 2| Nats
Publisher_3[Application B, Instance 1] -->|Message 3| Nats
Nats(Nats: Topic YYYY)
Nats -->|Message 1, Message 2, Message 3| Subscriber_1[Application C, Instance 1]
Nats -->|Message 1, Message 2, Message 3| Subscriber_2[Application C, Instance 2]
Nats -->|Message 1, Message 2, Message 3| Subscriber_3[Application D, Instance 1]
service PeopleService {
// queue group event, producer/consumer exclusive consume
// @queue: 'queueName' annotation is also supported
event changeAmount {
peopleID : UUID;
amount : Decimal;
}
// subscription event, publisher/subscriber broadcast consume
@topic : 'test.app.srv.people.broadcast'
event updateName {
peopleID : UUID;
Name : String;
Age : Integer;
}
@topic : 'test.app.srv.people.broadcast'
event updateAge {
peopleID : UUID;
Name : String;
Age : Integer;
}
}
TBD
{
"cds": {
"requires": {
"messaging": {
"kind": "nats"
},
"nats": {
"impl": "cds-nats"
}
}
}
}
Use
Nats
as a KV store
This is an experimental feature of Nats, you MUST enable the jetstream feature in nats server
const kv = await cds.connect.to("kv") as NatsKVService;
expect(kv).toBeInstanceOf(NatsKVService)
const id = cds.utils.uuid()
const v = await kv.get(id)
expect(v).toBeNull()
expect(await kv.keys()).toHaveLength(0)
await kv.set(id, "v1")
expect(await kv.keys()).toHaveLength(1)
expect(await kv.get(id)).toBe("v1")
await kv.set(id, "v2")
expect(await kv.get(id)).toBe("v2")
await kv.remove(id)
expect(await kv.get(id)).toBeNull()
expect(await kv.keys()).toHaveLength(0)
expect(await kv.get("k3", () => "v4")).toBe("v4")
expect(await kv.get("k3")).toBe("v4")
await kv.removeAll()
configure the
kv
service
{
"cds": {
"requires": {
"kv": {
"kind": "nats-kv",
"ttl": 100
},
"kv5000": {
"kind": "nats-kv",
"ttl": 5000
},
"nats-kv": {
"impl": "cds-nats/lib/NatsKVService"
}
}
}
}
ttl
, the maximum validity for each key, in milliseconds.
Use
Nats
as a distributed lock service
This is an experimental feature of Nats, you MUST enable the jetstream feature in nats server
{
"cds": {
"requires": {
"lock": {
"kind": "nats-lock",
"check": {
"interval": 10
},
"lock": {
"acquire": 10000
}
},
"nats-lock": {
"impl": "cds-nats/lib/NatsLockService"
}
}
}
}
check.interval
:NatsLockService
check lock in polling mode, so this is the check intervallock.acquire
: if the lock of target resource could be acquired immediately,NatsLockService
will pending, if the target lock could not be acquired in specific timeout duration,NatsLockService
will throw error to avoid to long time pendinglock.timeout
: the maximum timeout for single lock, if a resource is locked too long time, client will force acquire it, the defualt value is 1 HOUR
Use
Nats
as a RFC communication tool
To use the
NatsRFCService
, MUST enable Nats Messaging Service firstly
graph LR
ServiceA[Service A Instance 1]
ServiceA -->|1. Run CQN XXX| Nats
Nats(Nats Queue Group - ServiceC)
Nats -->|2. RUN CQN XXX| ServiceC
ServiceC -->|3. Execution Result| Nats
Nats -->|4. Execution Result| ServiceA
ServiceC[Service C instance 1]
const cds = cwdRequireCDS()
const { INSERT } = cds.ql
const messaging = await cds.connect.to("rfc") as NatsRFCService
const remoteApp = messaging.app("demo-app-micro-service");
const remotePeopleService = remoteApp.service("test.app.srv.theosun.PeopleService")
const newPeople = await remotePeopleService.run(
INSERT.into("People").entries({ Name: cds.utils.uuid() })
)
expect(newPeople).not.toBeNull()
expect(newPeople.Name).not.toBeUndefined()
const updatedPeople = await remotePeopleService.updateWeight(newPeople.ID, 12.3)
expect(updatedPeople.Name).toBe(newPeople.Name)
expect(updatedPeople.Weight).toBe(12.3)
await expect(() => remotePeopleService.notExistFunction())
.rejects
.toThrow("method/action/function 'notExistFunction' is not existed on the service 'test.app.srv.theosun.PeopleService'")
{
"cds": {
"requires": {
"messaging": {
"kind": "nats"
},
"rfc": {
"kind": "nats-rfc",
"app": {
"name": "demo-app-micro-service"
},
"invoke": {
"timeout": 180000
}
},
"nats": {
"impl": "cds-nats"
},
"nats-rfc": {
"impl": "cds-nats/lib/NatsRFCService"
}
}
}
}
app.name
- the app name of current application, its an identifier which will be used for RFCapp.timeout
- the timeout of each invocation, for example, if remote server do not respond result in 3 minutes,NatsRFCService
will stop waiting and throw error
- Nats Messaging Service
- Pub/Sub
- complex test case
- Produce/Consume
- basic support and test case
- tenant aware
-
tenant
recover -
user
recover-
user-attr
recover
-
-
-
messaging
-
srv.on
-
srv.emit
-
- Outbox enable
- Nats options documentation
- Pub/Sub
- Nats KV Store
- tenant aware
- get
- get with provider
- set
- delete
- Nats options documentation
- Nats Distributed Lock Service
- tenant aware
- 100 values test
- acquire timeout
- lock timeout (dead lock)
- synchronized method (high level API)
- Nats options documentation
- Nats RFC Service
- tenant aware
- OData Service query
- OData Unbounded Function/Action
- Rest Adapter operation
- Error handler
- Demo Micro Service
@sap/cds version | cds-mysql version |
---|---|
5.x | 2.9.x |
6.x | 2.10.x |