Permalink
Browse files

Add client.Request to support 2-way req/resp

Change-Id: Ie57542de6e0c14e759bdea99891873e7ef09dc70
  • Loading branch information...
1 parent 12499ca commit f2fa3d4e9ea93332e05483e4509093a6a4fa554a @anfernee anfernee committed Sep 14, 2012
Showing with 81 additions and 8 deletions.
  1. +26 −3 client.go
  2. +28 −2 client_test.go
  3. +27 −3 test/server.go
View
@@ -1,8 +1,11 @@
package nats
import (
+ "fmt"
+ "math/rand"
"net"
"sync"
+ "time"
)
type Subscription struct {
@@ -212,6 +215,7 @@ type Client struct {
Stopper
cc chan *Connection
+ r *rand.Rand
}
func NewClient() *Client {
@@ -220,6 +224,7 @@ func NewClient() *Client {
t.subscriptionRegistry.setup(t)
t.cc = make(chan *Connection)
+ t.r = rand.New(rand.NewSource(time.Now().UnixNano()))
return t
}
@@ -254,10 +259,11 @@ func (t *Client) Ping() bool {
return c.Ping()
}
-func (t *Client) publish(s string, m []byte, confirm bool) bool {
+func (t *Client) publish(s string, r string, m []byte, confirm bool) bool {
var o = new(writePublish)
o.Subject = s
+ o.ReplyTo = r
o.Message = m
c := t.AcquireConnection()
@@ -279,11 +285,28 @@ func (t *Client) publish(s string, m []byte, confirm bool) bool {
}
func (t *Client) Publish(s string, m []byte) bool {
- return t.publish(s, m, false)
+ return t.publish(s, "", m, false)
}
func (t *Client) PublishAndConfirm(s string, m []byte) bool {
- return t.publish(s, m, true)
+ return t.publish(s, "", m, true)
+}
+
+func (t *Client) Request(s string, m []byte, f func(*Subscription)) bool {
+ r := t.createInbox()
+
+ sub := t.NewSubscription(r)
+ sub.Subscribe()
+
+ go f(sub)
+
+ return t.publish(s, r, m, false)
+}
+
+func (t *Client) createInbox() string {
+ return fmt.Sprintf("_INBOX.%04x%04x%04x%04x%04x%06x",
+ t.r.Int31n(0x10000), t.r.Int31n(0x10000), t.r.Int31n(0x10000),
+ t.r.Int31n(0x10000), t.r.Int31n(0x10000), t.r.Int31n(0x1000000))
}
func (t *Client) runConnection(n net.Conn, sc chan bool) error {
View
@@ -1,10 +1,10 @@
package nats
import (
+ "nats/test"
"net"
- "testing"
"sync"
- "nats/test"
+ "testing"
)
type testClient struct {
@@ -277,6 +277,32 @@ func TestClientPublish(t *testing.T) {
tc.Teardown()
}
+func TestClientRequest(t *testing.T) {
+ var tc testClient
+
+ tc.Setup(t)
+
+ tc.Add(1)
+ go func() {
+ ok := tc.c.Request("subject", []byte("message"), func(sub *Subscription) {
+ for _ = range sub.Inbox {
+ break
+ }
+ })
+
+ if !ok {
+ t.Error("Expected success")
+ }
+
+ tc.Done()
+ }()
+
+ tc.s.AssertMatch("SUB _INBOX\\.[0-9a-f]{26} 1\r\n")
+ tc.s.AssertMatch("PUB subject _INBOX\\.[0-9a-f]{26} 7\r\nmessage\r\n")
+
+ tc.Teardown()
+}
+
func TestClientPublishAndConfirmSucceeds(t *testing.T) {
var tc testClient
View
@@ -2,12 +2,13 @@ package test
import (
"bytes"
- "net"
- "testing"
"crypto/rsa"
"crypto/tls"
- "math/big"
"encoding/hex"
+ "math/big"
+ "net"
+ "regexp"
+ "testing"
)
var testConfig *tls.Config
@@ -56,6 +57,29 @@ func (s *TestServer) AssertRead(v string) bool {
return true
}
+func (s *TestServer) AssertMatch(v string) bool {
+ var buf []byte
+ var n int
+ var e error
+ var m bool
+
+ buf = make([]byte, 1024)
+ if n, e = s.Conn.Read(buf); e != nil {
+ s.t.Errorf("Error: %#v", e)
+ return false
+ }
+
+ var b []byte = buf[0:n]
+
+ m, e = regexp.Match(v, b)
+ if !m || e != nil {
+ s.t.Errorf("Expected match: %#v, got: %#v", v, string(b))
+ return false
+ }
+
+ return true
+}
+
func (s *TestServer) AssertWrite(v string) bool {
var e error

0 comments on commit f2fa3d4

Please sign in to comment.