diff --git a/client/client.go b/client/client.go index 1b3fa41..2fd0de9 100644 --- a/client/client.go +++ b/client/client.go @@ -17,37 +17,23 @@ const ( connected ) -const CALL_ID = "_CALLID" -const CALL = "CALL" -const LLAC = "LLAC" +const CALL_ID = "_call" type Client struct { status uint32 conn *conn.Conn - opMutex sync.Mutex - sending map[string]*BaseToken - calling map[string]*CallToken - recvs map[string]recv - llacs map[string]llac -} - -type recv struct { - label []string - callback func(nson.Message) -} + opMutex sync.Mutex + sending map[string]*Token + recvChan chan *RecvMessage -type llac struct { - label []string - callback func(nson.Message) nson.Message + onConnectCallback func() } func NewClient(config conn.Config) (*Client, error) { client := &Client{ - sending: make(map[string]*BaseToken), - calling: make(map[string]*CallToken), - recvs: make(map[string]recv), - llacs: make(map[string]llac), + sending: make(map[string]*Token), + recvChan: make(chan *RecvMessage, 64), } config.OnConnect = func() { @@ -72,6 +58,20 @@ func NewClient(config conn.Config) (*Client, error) { func (c *Client) onConnect() { atomic.StoreUint32(&c.status, uint32(connected)) + + c.opMutex.Lock() + callback := c.onConnectCallback + c.opMutex.Unlock() + + if callback != nil { + callback() + } +} + +func (c *Client) OnConnect(callback func()) { + c.opMutex.Lock() + c.onConnectCallback = callback + c.opMutex.Unlock() } func (c *Client) onDisConnect() { @@ -87,15 +87,6 @@ func (c *Client) IsConnect() bool { return false } -func (c *Client) IsDisConnect() bool { - status := atomic.LoadUint32(&c.status) - if status == disconnected { - return true - } - - return false -} - func (c *Client) Close() { c.conn.Close() } @@ -108,11 +99,9 @@ func (c *Client) recv() { return } - // fmt.Println(msg.String()) - if ch, err := msg.GetString(conn.CHAN); err == nil { if code, err := msg.GetI32(conn.CODE); err == nil { - if id, err := msg.GetMessageId(conn.ID); err == nil { + if id, err := msg.GetMessageId(CALL_ID); err == nil { c.opMutex.Lock() sendToken, ok := c.sending[id.Hex()] if ok { @@ -122,119 +111,41 @@ func (c *Client) recv() { if ok { if code == 0 { - sendToken.flowComplete() + sendToken.setMessage(msg) } else { sendToken.setError(fmt.Errorf("error code: %v", code)) } - continue - } - } else if callId, err := msg.GetMessageId(CALL_ID); err == nil { - c.opMutex.Lock() - callToken, ok := c.calling[callId.Hex()] - if ok { - delete(c.calling, callId.Hex()) - } - c.opMutex.Unlock() - - if ok { - if code != 0 { - callToken.setError(fmt.Errorf("error code: %v", code)) - } - continue } } - } - - if ch == LLAC { - if callId, err := msg.GetMessageId(CALL_ID); err == nil { - c.opMutex.Lock() - if callToken, ok := c.calling[callId.Hex()]; ok { - delete(c.calling, callId.Hex()) - callToken.setMessage(msg) - } - c.opMutex.Unlock() - } } else { - c.opMutex.Lock() - if recv, ok := c.recvs[ch]; ok { - go recv.callback(msg) - } else if llac, ok := c.llacs[ch]; ok { - go func() { - if from, err := msg.GetMessageId(conn.FROM); err == nil { - if callId, err := msg.GetMessageId(CALL_ID); err == nil { - res_msg := llac.callback(msg) - - res_msg.Insert(conn.CHAN, nson.String(LLAC)) - res_msg.Insert(conn.TO, from) - res_msg.Insert(CALL_ID, callId) - - c.conn.SendMessage(res_msg) - } - } - }() - } - - c.opMutex.Unlock() + c.recvChan <- &RecvMessage{ch, msg} } + } else { log.Printf("message format error: %s", msg.String()) } } } -func (c *Client) Send( - ch string, - message nson.Message, - label []string, - to []nson.MessageId, - timeout time.Duration, -) error { - if ch == "" { - return errors.New("消息频道不能为空") +func (c *Client) RawSend(msg nson.Message, timeout time.Duration) (nson.Message, error) { + if msg == nil { + return nil, errors.New("消息不能为 nil") } - if message == nil { - return errors.New("消息不能为 nil") + if timeout == time.Duration(0) { + timeout = time.Second * 10 } - // 消息ID - id, err := message.GetMessageId(conn.ID) + id, err := msg.GetMessageId(CALL_ID) if err != nil { id = nson.NewMessageId() - message.Insert(conn.ID, id) - } - - // CHAN - message.Insert(conn.CHAN, nson.String(ch)) - - // LABEL - if label != nil && len(label) > 0 { - array := make(nson.Array, 0) - for _, v := range label { - array = append(array, nson.String(v)) - } - message.Insert(conn.LABEL, nson.Array(array)) - } - - // TO - if to != nil && len(to) > 0 { - array := make(nson.Array, 0) - for _, v := range to { - array = append(array, nson.String(v)) - } - message.Insert(conn.TO, nson.Array(array)) - } - - if timeout == time.Duration(0) { - return c.conn.SendMessage(message) + msg.Insert(CALL_ID, id) } - // message.Insert(conn.ACK, nson.Bool(true)) - - token := newBaseToken() + token := newToken() c.opMutex.Lock() c.sending[id.Hex()] = token c.opMutex.Unlock() @@ -248,33 +159,24 @@ func (c *Client) Send( c.opMutex.Unlock() }() - err = c.conn.SendMessage(message) + err = c.conn.SendMessage(msg) if err != nil { - return err + return nil, err } if !token.WaitTimeout(timeout) { - return errors.New("timeout") + return nil, errors.New("timeout") } - return token.Error() + return token.Message(), token.Error() } -func (c *Client) detach(ch string) error { +func (c *Client) Detach(ch string, label []string, timeout time.Duration) error { msg := nson.Message{ conn.CHAN: nson.String(conn.DETACH), conn.VALUE: nson.String(ch), } - return c.conn.SendMessage(msg) -} - -func (c *Client) attach(ch string, label []string) error { - msg := nson.Message{ - conn.CHAN: nson.String(conn.ATTACH), - conn.VALUE: nson.String(ch), - } - if label != nil && len(label) > 0 { array := make(nson.Array, 0) for _, v := range label { @@ -283,173 +185,42 @@ func (c *Client) attach(ch string, label []string) error { msg.Insert(conn.LABEL, nson.Array(array)) } - return c.conn.SendMessage(msg) + _, err := c.RawSend(msg, timeout) + return err } -func (c *Client) Recv( - ch string, - label []string, - callback func(nson.Message), -) error { - if ch == "" { - return errors.New("消息频道不能为空") - } - - err := c.RemoveRecv(ch) - if err != nil { - return err - } - - c.opMutex.Lock() - c.recvs[ch] = recv{ - label: label, - callback: callback, - } - c.opMutex.Unlock() - - return c.attach(ch, label) -} - -func (c *Client) RemoveRecv(ch string) error { - if ch == "" { - return errors.New("消息频道不能为空") - } - - c.opMutex.Lock() - _, ok := c.recvs[ch] - if ok { - delete(c.recvs, ch) - } - c.opMutex.Unlock() - - if ok { - return c.detach(ch) - } - - return nil -} - -func (c *Client) Call( - ch string, - message nson.Message, - label []string, - to []nson.MessageId, - timeout time.Duration, -) (nson.Message, error) { - if ch == "" { - return nil, errors.New("消息频道不能为空") - } - - if message == nil { - return nil, errors.New("消息不能为 nil") +func (c *Client) Attach(ch string, label []string, timeout time.Duration) error { + msg := nson.Message{ + conn.CHAN: nson.String(conn.ATTACH), + conn.VALUE: nson.String(ch), } - callId := nson.NewMessageId() - - message.Insert(CALL_ID, callId) - - // CHAN - ch2 := CALL + "." + ch - message.Insert(conn.CHAN, nson.String(ch2)) - - // LABEL if label != nil && len(label) > 0 { array := make(nson.Array, 0) for _, v := range label { array = append(array, nson.String(v)) } - message.Insert(conn.LABEL, nson.Array(array)) - } - - // TO - if to != nil && len(to) > 0 { - array := make(nson.Array, 0) - for _, v := range to { - array = append(array, nson.String(v)) - } - message.Insert(conn.TO, nson.Array(array)) - } - - if timeout == time.Duration(0) { - timeout = time.Second * 60 - } - - // share - message.Insert(conn.SHARE, nson.Bool(true)) - - token := newCallToken() - c.opMutex.Lock() - c.calling[callId.Hex()] = token - c.opMutex.Unlock() - defer func() { - c.opMutex.Lock() - if token, ok := c.calling[callId.Hex()]; ok { - token.flowComplete() - delete(c.calling, callId.Hex()) - } - c.opMutex.Unlock() - }() - - err := c.conn.SendMessage(message) - if err != nil { - return nil, err - } - - if !token.WaitTimeout(timeout) { - return nil, errors.New("timeout") - } - - err = token.Error() - if err != nil { - return nil, err + msg.Insert(conn.LABEL, nson.Array(array)) } - return token.Message(), nil + _, err := c.RawSend(msg, timeout) + return err } -func (c *Client) Llac( - ch string, - label []string, - callback func(nson.Message) nson.Message, -) error { - if ch == "" { - return errors.New("消息频道不能为空") - } - - err := c.RemoveLlac(ch) - if err != nil { - return err - } +func (c *Client) Send( + message *SendMessage, + timeout time.Duration, +) (nson.Message, error) { - ch = CALL + "." + ch + msg := message.Build() - c.opMutex.Lock() - c.llacs[ch] = llac{ - label: label, - callback: callback, + if message.Call { + return c.RawSend(msg, timeout) } - c.opMutex.Unlock() - return c.attach(ch, label) + return nil, c.conn.SendMessage(msg) } -func (c *Client) RemoveLlac(ch string) error { - if ch == "" { - return errors.New("消息频道不能为空") - } - - ch = CALL + "." + ch - - c.opMutex.Lock() - _, ok := c.llacs[ch] - if ok { - delete(c.llacs, ch) - } - c.opMutex.Unlock() - - if ok { - return c.detach(ch) - } - - return nil +func (c *Client) Recv() <-chan *RecvMessage { + return c.recvChan } diff --git a/client/message.go b/client/message.go new file mode 100644 index 0000000..8d43f78 --- /dev/null +++ b/client/message.go @@ -0,0 +1,112 @@ +package client + +import ( + "github.com/danclive/nson-go" + "github.com/danclive/queen-go/conn" +) + +type SendMessage struct { + Ch string + Body nson.Message + Id *nson.MessageId + Label []string + To []nson.MessageId + Call bool +} + +func NewSendMessage(ch string, body nson.Message) *SendMessage { + return &SendMessage{ + Ch: ch, + Body: body, + } +} + +func (s *SendMessage) WithId(id nson.MessageId) *SendMessage { + s.Id = &id + return s +} + +func (s *SendMessage) WithLabel(label []string) *SendMessage { + s.Label = label + return s +} + +func (s *SendMessage) WithTo(to []nson.MessageId) *SendMessage { + s.To = to + return s +} + +func (s *SendMessage) WithCall(call bool) *SendMessage { + s.Call = call + return s +} + +func (s *SendMessage) Build() nson.Message { + msg := s.Body + + msg.Insert(conn.CHAN, nson.String(s.Ch)) + + if s.Label != nil && len(s.Label) > 0 { + array := make(nson.Array, 0) + for _, v := range s.Label { + array = append(array, nson.String(v)) + } + msg.Insert(conn.LABEL, nson.Array(array)) + } + + if s.To != nil && len(s.To) > 0 { + array := make(nson.Array, 0) + for _, v := range s.To { + array = append(array, v) + } + msg.Insert(conn.TO, nson.Array(array)) + } + + if s.Call { + msg.Insert(conn.SHARE, nson.Bool(true)) + } + + return msg +} + +type RecvMessage struct { + Ch string + Body nson.Message +} + +func (r *RecvMessage) CallId() (nson.MessageId, bool) { + if id, err := r.Body.GetMessageId(CALL_ID); err == nil { + return id, true + } + + return nil, false +} + +func (r *RecvMessage) FromId() (nson.MessageId, bool) { + if id, err := r.Body.GetMessageId(conn.FROM); err == nil { + return id, true + } + + return nil, false +} + +func (r *RecvMessage) Back() (*SendMessage, bool) { + callId, ok := r.CallId() + if !ok { + return nil, false + } + + fromId, ok := r.FromId() + if !ok { + return nil, false + } + + return &SendMessage{ + Ch: r.Ch, + Body: nson.Message{ + CALL_ID: callId, + conn.CODE: nson.I32(0), + }, + To: []nson.MessageId{fromId}, + }, true +} diff --git a/client/token.go b/client/token.go index 7a01ab6..2be7445 100644 --- a/client/token.go +++ b/client/token.go @@ -53,39 +53,27 @@ func (b *baseToken) setError(e error) { b.m.Unlock() } -type BaseToken struct { - baseToken -} - -func newBaseToken() *BaseToken { - return &BaseToken{ - baseToken: baseToken{ - complete: make(chan struct{}), - }, - } -} - -type CallToken struct { +type Token struct { baseToken msg nson.Message } -func newCallToken() *CallToken { - return &CallToken{ +func newToken() *Token { + return &Token{ baseToken: baseToken{ complete: make(chan struct{}), }, } } -func (c *CallToken) setMessage(msg nson.Message) { +func (c *Token) setMessage(msg nson.Message) { c.m.Lock() c.msg = msg c.flowComplete() c.m.Unlock() } -func (c *CallToken) Message() nson.Message { +func (c *Token) Message() nson.Message { c.m.Lock() defer c.m.Unlock() return c.msg diff --git a/cmd/recv/main.go b/cmd/recv/main.go index 70f199a..a706702 100644 --- a/cmd/recv/main.go +++ b/cmd/recv/main.go @@ -1,10 +1,8 @@ package main import ( - "fmt" "log" - "github.com/danclive/nson-go" "github.com/danclive/queen-go/client" "github.com/danclive/queen-go/conn" "github.com/danclive/queen-go/crypto" @@ -20,38 +18,31 @@ func main() { Debug: false, } - client, err := client.NewClient(config) + c, err := client.NewClient(config) if err != nil { log.Fatalln(err) } - _ = client - // go func() { // time.Sleep(10 * time.Second) // client.Close() // }() - client.Recv("lala", nil, func(ch string, message nson.Message) { - fmt.Println(ch) - fmt.Println(message) - }) + c.OnConnect(func() { + err = c.Attach("hello", nil, 0) + if err != nil { + log.Fatalln(err) + } - client.Llac("lala", nil, func(ch string, message nson.Message) nson.Message { - fmt.Println(ch) - fmt.Println(message) + for recv := range c.Recv() { + log.Println(recv) - return message + if back, ok := recv.Back(); ok { + c.Send(back, 0) + } + } }) - i := 0 - for { - i++ - fmt.Println(i) - err = client.Send("lala", nson.Message{"aaa": nson.String("bbb")}, nil, nil, 0) - log.Println(err) - } - var e = make(chan bool) <-e } diff --git a/cmd/send/main.go b/cmd/send/main.go index b2aa43d..8aee9c6 100644 --- a/cmd/send/main.go +++ b/cmd/send/main.go @@ -6,7 +6,6 @@ import ( "time" "github.com/danclive/nson-go" - "github.com/danclive/queen-go/client" "github.com/danclive/queen-go/conn" "github.com/danclive/queen-go/crypto" @@ -14,7 +13,7 @@ import ( func main() { config := conn.Config{ - Addrs: []string{"danclive.com:8888"}, + Addrs: []string{"snple.com:8888"}, EnableCrypto: true, CryptoMethod: crypto.Aes128Gcm, AccessKey: "fcbd6ea1e8c94dfc6b84405e", @@ -22,33 +21,28 @@ func main() { Debug: false, } - client, err := client.NewClient(config) + c, err := client.NewClient(config) if err != nil { log.Fatalln(err) } - _ = client + // go func() { + // time.Sleep(10 * time.Second) + // client.Close() + // }() + + _ = c + for { + //time.Sleep(1 * time.Second) + time1 := time.Now() - err = client.Send("lala", nson.Message{"aaa": nson.String("bbb")}, nil, nil, time.Second*10) - log.Println(err) + msg := client.NewSendMessage("hello", nson.Message{"aaa": nson.String("bbb")}).WithCall(true) - msg, err := client.Call("lala", nson.Message{"aaa": nson.String("bbb")}, nil, nil, time.Second*10) - log.Println(err) - log.Println(msg) + log.Println(c.Send(msg, 0)) - for { - now := time.Now() - msg, err := client.Call("ping", nson.Message{"aaa": nson.String("bbb")}, nil, nil, time.Second*10) - //log.Println(err) - //log.Println(msg) - _ = err - _ = msg - now2 := time.Now() - - fmt.Println(now2.Sub(now)) - time.Sleep(time.Second) + fmt.Println(time.Now().Sub(time1)) } - // var e = make(chan bool) - // <-e + var e = make(chan bool) + <-e } diff --git a/conn/conn.go b/conn/conn.go index 574ecca..aff2c87 100644 --- a/conn/conn.go +++ b/conn/conn.go @@ -18,7 +18,7 @@ import ( type Config struct { Addrs []string - ClientId nson.MessageId + SlotId nson.MessageId EnableCrypto bool CryptoMethod crypto.Method AccessKey string @@ -39,8 +39,8 @@ func (cfg *Config) init() { cfg.Addrs = []string{"127.0.0.1:8888"} } - if cfg.ClientId == nil { - cfg.ClientId = nson.NewMessageId() + if cfg.SlotId == nil { + cfg.SlotId = nson.NewMessageId() } if cfg.AuthMessage == nil { @@ -48,6 +48,7 @@ func (cfg *Config) init() { } cfg.AuthMessage.Insert(CHAN, nson.String(AUTH)) + cfg.AuthMessage.Insert(SLOT_ID, cfg.SlotId) if cfg.EnableCrypto { if cfg.CryptoMethod == crypto.None || cfg.AccessKey == "" || cfg.SecretKey == "" { @@ -288,15 +289,17 @@ func (c *Conn) heartbeat() { c.trace("heartbeat exit") return case <-time.After(c.config.HeartbeatInterval): - message := nson.Message{ - CHAN: nson.String(PING), - } - - c.SendMessage(message) - d := time.Now().Sub(c.LastRecvTime()) c.trace("heartbeat: %v", d) + if d > c.config.HeartbeatInterval { + message := nson.Message{ + CHAN: nson.String(PING), + } + + c.SendMessage(message) + } + if d > c.config.HeartbeatTimeout { c.TryReconn() } @@ -494,20 +497,6 @@ func (c *Conn) SendMessage(msg nson.Message) (err error) { c.writeMutex.Unlock() }() - // base := c.base - - // err = c._write(base, msg) - // if err == nil { - // c.writeCount += 1 - // return - // } - // base.Close() - - // go c.tryReconn(base) - - // if c.waitReconn('w', c.writeWaitChan) { - // err = nil - // } for { if atomic.LoadInt32(&c.closed) != 0 { return errors.New("conn closed") diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9290c99 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/danclive/queen-go + +go 1.15 + +require ( + github.com/danclive/nson-go v0.3.0 + github.com/stretchr/testify v1.6.1 + golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f920ce9 --- /dev/null +++ b/go.sum @@ -0,0 +1,21 @@ +github.com/danclive/nson-go v0.3.0 h1:skVgeJrbJsQk3HpVOAtZ71QMi+VQuhttgUVsDDMBDAE= +github.com/danclive/nson-go v0.3.0/go.mod h1:L1kLvlMsYMooindbfOBMV33U0S0chCfPyC5g2dimKAQ= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=