From 0c0d781ce45c7e02b5ff23f965c2f195f20d09d6 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sun, 27 Nov 2022 21:04:47 -0500 Subject: [PATCH 1/3] Add server-streaming example in Go Signed-off-by: Byron Ruth --- .../messaging/server-streaming/go/main.go | 73 +++++++++++++++++++ examples/messaging/server-streaming/meta.yaml | 5 ++ 2 files changed, 78 insertions(+) create mode 100644 examples/messaging/server-streaming/go/main.go create mode 100644 examples/messaging/server-streaming/meta.yaml diff --git a/examples/messaging/server-streaming/go/main.go b/examples/messaging/server-streaming/go/main.go new file mode 100644 index 00000000..e3e2d99f --- /dev/null +++ b/examples/messaging/server-streaming/go/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "time" + + "github.com/nats-io/nats.go" +) + +func main() { + // Use the env varibale if running in the container, otherwise use the default. + url := os.Getenv("NATS_URL") + if url == "" { + url = nats.DefaultURL + } + + // Create an unauthenticated connection to NATS. + nc, _ := nats.Connect(url) + defer nc.Drain() + + // Emulate a Random Number Generator service. + nc.Subscribe("rng", func(msg *nats.Msg) { + // Assume the message payload is a valid integer. + n, _ := strconv.ParseInt(string(msg.Data), 10, 64) + + // Get the reply subject to stream messages to. + reply := msg.Header.Get("x-stream-subject") + + // Respond to the initial request with a nil message indicating everything + // is OK and the messages will be streamed. If there was a validation issue + // or other problem, an error response could be returned to tell the client + // to close the subscription. + msg.Respond(nil) + + // Stream the responds to the client. + for i := 0; i < int(n); i++ { + r := rand.Intn(100) + nc.Publish(reply, []byte(fmt.Sprintf("%d", r))) + } + + // Publish nil data to indicate end-of-stream. + nc.Publish(reply, nil) + }) + + // Generate a new random inbox subject for the server stream. + streamSubject := nc.NewInbox() + + // Setup a subscription on that unique subject to buffer messages. + sub, _ := nc.SubscribeSync(streamSubject) + + // Construct the request message, including the stream header and the + // number of random numbers to generate. + msg := nats.NewMsg("rng") + msg.Data = []byte("10") + msg.Header.Add("x-stream-subject", streamSubject) + + // Assume all is ok for the example.. + nc.RequestMsg(msg, time.Second) + + for { + msg, _ := sub.NextMsg(time.Second) + // Indicates the end of the stream. + if len(msg.Data) == 0 { + break + } + + // Print the random number. + fmt.Println(string(msg.Data)) + } +} diff --git a/examples/messaging/server-streaming/meta.yaml b/examples/messaging/server-streaming/meta.yaml new file mode 100644 index 00000000..69a3cfca --- /dev/null +++ b/examples/messaging/server-streaming/meta.yaml @@ -0,0 +1,5 @@ +title: Server-side streaming +description: |- + This messaging pattern is initiated by a client request which will result in one or more messages replied from the server. + + This is achieved by the client generating a one-off, random subject and subscribing to it. The subject is include in the initial request that the service can publish messages back to. From be5b852997937265816c1859a5474c6380225d55 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sun, 27 Nov 2022 21:13:22 -0500 Subject: [PATCH 2/3] Update Go and server versions Signed-off-by: Byron Ruth --- cmd/nbe/go.mod | 2 +- docker/docker-compose.cluster.yaml | 6 +++--- docker/docker-compose.yaml | 2 +- docker/go/Dockerfile | 2 +- docker/go/go.mod | 10 +++------- docker/go/go.sum | 24 ++++-------------------- 6 files changed, 13 insertions(+), 33 deletions(-) diff --git a/cmd/nbe/go.mod b/cmd/nbe/go.mod index fa450c8e..2a2a32a8 100644 --- a/cmd/nbe/go.mod +++ b/cmd/nbe/go.mod @@ -1,6 +1,6 @@ module github.com/bruth/nats-by-example/cmd/nbe -go 1.18 +go 1.19 require ( github.com/alecthomas/chroma v0.10.0 diff --git a/docker/docker-compose.cluster.yaml b/docker/docker-compose.cluster.yaml index 6b787efd..fd140536 100644 --- a/docker/docker-compose.cluster.yaml +++ b/docker/docker-compose.cluster.yaml @@ -1,7 +1,7 @@ version: '3.9' services: nats1: - image: docker.io/nats:2.9.0 + image: docker.io/nats:2.9.8 command: - "--debug" - "--name=nats1" @@ -15,7 +15,7 @@ services: - "18222:8222" nats2: - image: docker.io/nats:2.9.0 + image: docker.io/nats:2.9.8 command: - "--debug" - "--name=nats2" @@ -29,7 +29,7 @@ services: - "28222:8222" nats3: - image: docker.io/nats:2.9.0 + image: docker.io/nats:2.9.8 command: - "--debug" - "--name=nats3" diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 01ffc775..a08ccef6 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -1,7 +1,7 @@ version: '3.9' services: nats: - image: docker.io/nats:2.9.0 + image: docker.io/nats:2.9.8 command: - "--debug" - "--http_port=8222" diff --git a/docker/go/Dockerfile b/docker/go/Dockerfile index 37cce520..65059000 100644 --- a/docker/go/Dockerfile +++ b/docker/go/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.18-alpine AS build +FROM golang:1.19-alpine AS build WORKDIR /opt/app diff --git a/docker/go/go.mod b/docker/go/go.mod index 8087879a..2ef162f6 100644 --- a/docker/go/go.mod +++ b/docker/go/go.mod @@ -1,14 +1,10 @@ module github.com/ConnectEverything/nats-by-example/go -go 1.18 - -require github.com/nats-io/nats.go v1.16.0 +go 1.19 require ( - github.com/golang/protobuf v1.5.2 // indirect - github.com/nats-io/nats-server/v2 v2.8.4 // indirect + github.com/nats-io/nats.go v1.20.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect - golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect - google.golang.org/protobuf v1.28.0 // indirect + golang.org/x/crypto v0.3.0 // indirect ) diff --git a/docker/go/go.sum b/docker/go/go.sum index 40c4c9b6..836f4562 100644 --- a/docker/go/go.sum +++ b/docker/go/go.sum @@ -1,30 +1,14 @@ -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= -github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= -github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= -github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= -github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.20.0 h1:T8JJnQfVSdh1CzGiwAOv5hEobYCBho/0EupGznYw0oM= +github.com/nats-io/nats.go v1.20.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.3.0 h1:a06MkbcxBrEFc0w0QIZWXrH/9cCX6KJyWbBOIwAn+7A= +golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= From 3b729b401b2721d85a22f8975b329e7bb0326787 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 28 Nov 2022 08:05:55 -0500 Subject: [PATCH 3/3] Update service streaming example Signed-off-by: Byron Ruth --- .../messaging/server-streaming/go/main.go | 73 ---------------- examples/messaging/server-streaming/meta.yaml | 5 -- .../messaging/service-streaming/go/main.go | 85 +++++++++++++++++++ .../messaging/service-streaming/meta.yaml | 31 +++++++ 4 files changed, 116 insertions(+), 78 deletions(-) delete mode 100644 examples/messaging/server-streaming/go/main.go delete mode 100644 examples/messaging/server-streaming/meta.yaml create mode 100644 examples/messaging/service-streaming/go/main.go create mode 100644 examples/messaging/service-streaming/meta.yaml diff --git a/examples/messaging/server-streaming/go/main.go b/examples/messaging/server-streaming/go/main.go deleted file mode 100644 index e3e2d99f..00000000 --- a/examples/messaging/server-streaming/go/main.go +++ /dev/null @@ -1,73 +0,0 @@ -package main - -import ( - "fmt" - "math/rand" - "os" - "strconv" - "time" - - "github.com/nats-io/nats.go" -) - -func main() { - // Use the env varibale if running in the container, otherwise use the default. - url := os.Getenv("NATS_URL") - if url == "" { - url = nats.DefaultURL - } - - // Create an unauthenticated connection to NATS. - nc, _ := nats.Connect(url) - defer nc.Drain() - - // Emulate a Random Number Generator service. - nc.Subscribe("rng", func(msg *nats.Msg) { - // Assume the message payload is a valid integer. - n, _ := strconv.ParseInt(string(msg.Data), 10, 64) - - // Get the reply subject to stream messages to. - reply := msg.Header.Get("x-stream-subject") - - // Respond to the initial request with a nil message indicating everything - // is OK and the messages will be streamed. If there was a validation issue - // or other problem, an error response could be returned to tell the client - // to close the subscription. - msg.Respond(nil) - - // Stream the responds to the client. - for i := 0; i < int(n); i++ { - r := rand.Intn(100) - nc.Publish(reply, []byte(fmt.Sprintf("%d", r))) - } - - // Publish nil data to indicate end-of-stream. - nc.Publish(reply, nil) - }) - - // Generate a new random inbox subject for the server stream. - streamSubject := nc.NewInbox() - - // Setup a subscription on that unique subject to buffer messages. - sub, _ := nc.SubscribeSync(streamSubject) - - // Construct the request message, including the stream header and the - // number of random numbers to generate. - msg := nats.NewMsg("rng") - msg.Data = []byte("10") - msg.Header.Add("x-stream-subject", streamSubject) - - // Assume all is ok for the example.. - nc.RequestMsg(msg, time.Second) - - for { - msg, _ := sub.NextMsg(time.Second) - // Indicates the end of the stream. - if len(msg.Data) == 0 { - break - } - - // Print the random number. - fmt.Println(string(msg.Data)) - } -} diff --git a/examples/messaging/server-streaming/meta.yaml b/examples/messaging/server-streaming/meta.yaml deleted file mode 100644 index 69a3cfca..00000000 --- a/examples/messaging/server-streaming/meta.yaml +++ /dev/null @@ -1,5 +0,0 @@ -title: Server-side streaming -description: |- - This messaging pattern is initiated by a client request which will result in one or more messages replied from the server. - - This is achieved by the client generating a one-off, random subject and subscribing to it. The subject is include in the initial request that the service can publish messages back to. diff --git a/examples/messaging/service-streaming/go/main.go b/examples/messaging/service-streaming/go/main.go new file mode 100644 index 00000000..e20031a4 --- /dev/null +++ b/examples/messaging/service-streaming/go/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "time" + + "github.com/nats-io/nats.go" +) + +func main() { + // Use the env varibale if running in the container, otherwise use the default. + url := os.Getenv("NATS_URL") + if url == "" { + url = nats.DefaultURL + } + + // Create an unauthenticated connection to NATS. + nc, _ := nats.Connect(url) + defer nc.Drain() + + // Setup a simple random number generator (RNG) service that + // streams numbers. Using a queue group ensures that only one + // of the members will receive the request to process. + nc.QueueSubscribe("rng", "rng", func(msg *nats.Msg) { + // Do request validation, etc. If there are any issues, a response + // can be sent with any error prior to the stream starting. + n, _ := strconv.ParseInt(string(msg.Data), 10, 64) + + // Extract the unique subject to stream messages on. + subject := msg.Header.Get("x-stream-subject") + + // Respond to the initial request with any errors. Otherwise an + // empty reply indicates the stream will start. + msg.Respond(nil) + + // Stream the numbers to the client. A publish is used here, + // however, if ack'ing is desired, a request could be used + // per message. + for i := 0; i < int(n); i++ { + r := rand.Intn(100) + nc.Publish(subject, []byte(fmt.Sprintf("%d", r))) + } + + // Publish empty data to indicate end-of-stream. + nc.Publish(subject, nil) + }) + + // Generate a unique inbox subject for the stream of messages + // and subscribe to it. + inbox := nc.NewInbox() + sub, _ := nc.SubscribeSync(inbox) + + // Prepare the message to initiate the interaction. The inbox + // subject is included in the header for the service to extract + // and publish to. + msg := nats.NewMsg("rng") + msg.Header.Set("x-stream-subject", inbox) + msg.Data = []byte("10") + + // Send the request to initiate the interaction. + nc.RequestMsg(msg, time.Second) + + // Loop to receive all messages over the stream. + for { + msg, err := sub.NextMsg(time.Second) + // Handle error. + if err != nil { + sub.Unsubscribe() + // handle error + break + } + + // Indicates the end of the stream. + if len(msg.Data) == 0 { + sub.Unsubscribe() + break + } + + // Print the random number. + fmt.Println(string(msg.Data)) + } +} diff --git a/examples/messaging/service-streaming/meta.yaml b/examples/messaging/service-streaming/meta.yaml new file mode 100644 index 00000000..074bb028 --- /dev/null +++ b/examples/messaging/service-streaming/meta.yaml @@ -0,0 +1,31 @@ +title: Service streaming pattern +description: |- + This messaging pattern builds upon the built-in [request-reply][req-rep] API. + + Prior to the client sending the request, it must generate a unique subject + to be the dedicated *inbox* for receiving the stream of messages from the + service. + + The simpliest way to achieve this is to use the client connection's + `NewInbox()` method which also ensures any connection-level inbox prefix + is already prepended. + + This unique subject is then added as a header, such as `x-stream-subject`. + Prior to sending the request, the subscription on that subject must be setup + by the client so it can receive messages as soon as the client sends the + request. + + When the service receives the request, it can do any request validation, etc. + and perform the standard reply indicating the streaming will be begin. + Now the service can freely publish messages (or send requests if acks are desired) + on the subject passed within the header. + + To indicate the end-of-stream, the last message provides an empty message body + with an optional set of headers indicating any status, such as an error + had occurred mid-stream. + + For those familiar with gRPC, this is analogous to the + [server streaming RPC][grpc]. + + [req-rep]: /examples/messaging/request-reply/go + [grpc]: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc