Skip to content

balchua/demo-jetstream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

32 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Playing with NATS Jetstream

This project demonstrate the use of NATS Jetstream. NATS Jetstream is the persistent storage engine built into core NATS server.

For more information about NATS Jetstream visit this link

This project will demonstrate the publishing of message from a publisher and a consumer based on different subjects. The consumer will then save the records into a simple PostgreSQL database.

Rules

Publisher will publish the following a UserTransaction event.

{
    "TransactionID":1,
    "UserID":1,
    "Status": "OK",
    "Amount": 456.89
}

The Subscriber will ingest this event and verify if the transaction is within the user threshold.

Getting started

Clone this project

Pre-requisites

  • Kubernetes cluster (single node is enough), example MicroK8s.

    sudo snap install microk8s --channel 1.23/stable --classic
  • Default Persistent Volume

    microk8s enable storage`
  • Install NATS using helm with Jetstream enabled.

    Nats Charts Documentation

    Nats Charts

    kubectl apply -f hack/jetstream-pvc.yaml
    helm upgrade --install --namespace nats bnats nats/nats -f hack/values.yaml

    Check out the values.yaml in the hack directory.

Manually create the Stream and Consumers

Refer to the document

Build and Test

go build.
go test -v -p=1 -coverpkg=./... -coverprofile=coverage.txt  ./pkg/...
go tool cover -html=coverage.txt

Auto create the STREAM and CONSUMERS

This project will automatically create the STREAMS and CONSUMERS, when u start the application like below.

demo-jetstream setup  --config "hack/config.yaml" \
  --streamName "USER_TXN" --streamSubjects "USER_TXN.>" \
  --consumerName "GRP_MAKER" \
  --consumerSubject "USER_TXN.maker" 

This will setup the STREAM USER_TXN and the CONSUMER GRP_MAKER, together with the corresponding stream and consumer subjects.

Generate Account and authorization using nkeys

To support multi-tenant NATS, one can setup accounts as well as using the nkeys to secure the NATS cluster.

It should be noted that the values.yaml presented above already set the necessary nkeys and users using the generated values below.

Below are ways on how to generate the nkeys. More information from NATS documentation.

Download the nk tools

go install github.com/nats-io/nkeys/nk

Generate the keys

nk -gen user -pubout
SUAJSNDKKS4SLYV4BWYIF3RHP72PCF7LAXI6SIUIWLZW72DEBGFY6CCSAI
UB6WFVVI6BKTAHT5XGS55BONYOE3TDF47ZD7F75YVPABRXJ7XHWZKX2W

From the output above

Seed (private key) - SUAJSNDKKS4SLYV4BWYIF3RHP72PCF7LAXI6SIUIWLZW72DEBGFY6CCSAI User (public key) - UB6WFVVI6BKTAHT5XGS55BONYOE3TDF47ZD7F75YVPABRXJ7XHWZKX2W

These generated nkeys are stored in the seed.txt, this is used in the code.

nk -gen user -pubout
SUAMKIAMDUJITCXXXTL2XMHTVT3OBSA3KWLIZQ3NFBA4FMD3SQ75GJEF6Y
UD736QEIGXPHB5CLR4UAPCOEXET6WIKDYWELPIFHJJDJRNKH3SDHZTLT

Keep the SUAMKIAMDUJITCXXXTL2XMHTVT3OBSA3KWLIZQ3NFBA4FMD3SQ75GJEF6Y into sys-seed.txt and add the user key to the values.yaml

If you generate new keys, you must set them in the values.yaml auth section. See sample snippet below. You must redeploy the NATS server.

auth:
  enabled: true
  basic:
    accounts:
      demo:
        jetstream: enabled        
        users:
        - nkey: UB6WFVVI6BKTAHT5XGS55BONYOE3TDF47ZD7F75YVPABRXJ7XHWZKX2W
          permission:
            publish: 
            - "USER_TXN.maker"
            - "$JS.>"
            subscribe: 
            - "USER_TXN.maker"
            - "_INBOX.>"
        - nkey: UD736QEIGXPHB5CLR4UAPCOEXET6WIKDYWELPIFHJJDJRNKH3SDHZTLT

Application configuration

In this demo, the sample application configuration is defined in config.yaml.

infra:
  natsUri: "nats://localhost:4220"
  seedPath: "hack/sys-seed.txt"
publish:
  natsUri: "nats://localhost:4220"
  seedPath: "hack/seed.txt"
subscribe:
  natsUri: "nats://localhost:4220"
  seedPath: "hack/seed.txt"
  sleepTimeInMillis: 3000
monitor:
  scheme: "http"
  host: "localhost"
  port: 32822
  account: "demo"
  consumerName: "GRP_MAKER"
  streamName: "USER_TXN"
  pollSeconds: 1
tracing:
  jaeger-url: http://localhost:30268/api/traces
  service-name: natsjs-demo

TODO: Add Jaeger configuration

Port-forwarding

To allow the application to access NATS Jetstream, you must do port-forward to all nats pods.

kubectl -n nats port-forward svc/bnats 4220:4222

To access NATS monitoring endpoint

kubectl -n nats port-forward svc/bnats 32822:8222

Publishing message to NAT Jetstream stream USER_TXN

This will publish 10 messages to the stream on subject USER_TXN.maker

./demo-jetstream publish --config "hack/config.yaml" --streamName "USER_TXN" --messageSubject "USER_TXN.maker" --maxCount "10" --message "{\"TransactionID\":1,\"UserID\":1,\"Status\":\"OK\",\"Amount\": 456.89}"

Monitoring message lag on Consumer

Using the configuration here.

The monitoring will check the message lag of the Consumer USER_TXN.maker in the Stream USER_TXN using the account demo

./demo-jetstream monitor --config "hack/config.yaml"
...
2021-12-28T17:54:30.352+0800	INFO	monitoring/monitor.go:47	total lag is 10100
2021-12-28T17:54:31.354+0800	INFO	monitoring/monitor.go:47	total lag is 10100
2021-12-28T17:54:32.357+0800	INFO	monitoring/monitor.go:47	total lag is 10100
2021-12-28T17:54:33.360+0800	INFO	monitoring/monitor.go:47	total lag is 10100
2021-12-28T17:54:34.365+0800	INFO	monitoring/monitor.go:47	total lag is 10100
2021-12-28T17:54:35.368+0800	INFO	monitoring/monitor.go:47	total lag is 10100
2021-12-28T17:54:36.371+0800	INFO	monitoring/monitor.go:47	total lag is 10100
...

Consume message

./demo-jetstream consume --config "hack/config.yaml" --consumerName "GRP_MAKER" --subscriberSubject "USER_TXN.maker"

Verifier Service

Generate the protos. from project root

protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    pkg/api/verifier/verifier.proto

This service is called by the consumer when it received a message in NATS Jetstream.

The configuration is in config.yaml

api:
  host: "localhost"
  port: 50055

Enabling distributed tracing

Using MicroK8s, enable Jaeger

kubectl create ns observability
microk8s enable jaeger:observability

Accessing Jaeger

To enable Jaeger collector

kubectl -n observability port-forward svc/simplest-collector 30268:14268

To access Jaeger UI

kubectl -n observability port-forward svc/simplest-query 30686:16686

Sample trace

distributed traces distributed traces

Create Postgres DB

TODO

About

demo app to use NATS Jetstream

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages