Skip to content

Commit

Permalink
rewrite client
Browse files Browse the repository at this point in the history
  • Loading branch information
danclive committed Sep 9, 2020
1 parent 42a418f commit c44470e
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 369 deletions.
343 changes: 57 additions & 286 deletions client/client.go

Large diffs are not rendered by default.

112 changes: 112 additions & 0 deletions client/message.go
@@ -0,0 +1,112 @@
package client

import (
"github.com/danclive/nson-go"
"github.com/danclive/queen-go/conn"
)

type SendMessage struct {
Ch string
Body nson.Message
Id *nson.MessageId
Label []string
To []nson.MessageId
Call bool
}

func NewSendMessage(ch string, body nson.Message) *SendMessage {
return &SendMessage{
Ch: ch,
Body: body,
}
}

func (s *SendMessage) WithId(id nson.MessageId) *SendMessage {
s.Id = &id
return s
}

func (s *SendMessage) WithLabel(label []string) *SendMessage {
s.Label = label
return s
}

func (s *SendMessage) WithTo(to []nson.MessageId) *SendMessage {
s.To = to
return s
}

func (s *SendMessage) WithCall(call bool) *SendMessage {
s.Call = call
return s
}

func (s *SendMessage) Build() nson.Message {
msg := s.Body

msg.Insert(conn.CHAN, nson.String(s.Ch))

if s.Label != nil && len(s.Label) > 0 {
array := make(nson.Array, 0)
for _, v := range s.Label {
array = append(array, nson.String(v))
}
msg.Insert(conn.LABEL, nson.Array(array))
}

if s.To != nil && len(s.To) > 0 {
array := make(nson.Array, 0)
for _, v := range s.To {
array = append(array, v)
}
msg.Insert(conn.TO, nson.Array(array))
}

if s.Call {
msg.Insert(conn.SHARE, nson.Bool(true))
}

return msg
}

type RecvMessage struct {
Ch string
Body nson.Message
}

func (r *RecvMessage) CallId() (nson.MessageId, bool) {
if id, err := r.Body.GetMessageId(CALL_ID); err == nil {
return id, true
}

return nil, false
}

func (r *RecvMessage) FromId() (nson.MessageId, bool) {
if id, err := r.Body.GetMessageId(conn.FROM); err == nil {
return id, true
}

return nil, false
}

func (r *RecvMessage) Back() (*SendMessage, bool) {
callId, ok := r.CallId()
if !ok {
return nil, false
}

fromId, ok := r.FromId()
if !ok {
return nil, false
}

return &SendMessage{
Ch: r.Ch,
Body: nson.Message{
CALL_ID: callId,
conn.CODE: nson.I32(0),
},
To: []nson.MessageId{fromId},
}, true
}
22 changes: 5 additions & 17 deletions client/token.go
Expand Up @@ -53,39 +53,27 @@ func (b *baseToken) setError(e error) {
b.m.Unlock()
}

type BaseToken struct {
baseToken
}

func newBaseToken() *BaseToken {
return &BaseToken{
baseToken: baseToken{
complete: make(chan struct{}),
},
}
}

type CallToken struct {
type Token struct {
baseToken
msg nson.Message
}

func newCallToken() *CallToken {
return &CallToken{
func newToken() *Token {
return &Token{
baseToken: baseToken{
complete: make(chan struct{}),
},
}
}

func (c *CallToken) setMessage(msg nson.Message) {
func (c *Token) setMessage(msg nson.Message) {
c.m.Lock()
c.msg = msg
c.flowComplete()
c.m.Unlock()
}

func (c *CallToken) Message() nson.Message {
func (c *Token) Message() nson.Message {
c.m.Lock()
defer c.m.Unlock()
return c.msg
Expand Down
33 changes: 12 additions & 21 deletions cmd/recv/main.go
@@ -1,10 +1,8 @@
package main

import (
"fmt"
"log"

"github.com/danclive/nson-go"
"github.com/danclive/queen-go/client"
"github.com/danclive/queen-go/conn"
"github.com/danclive/queen-go/crypto"
Expand All @@ -20,38 +18,31 @@ func main() {
Debug: false,
}

client, err := client.NewClient(config)
c, err := client.NewClient(config)
if err != nil {
log.Fatalln(err)
}

_ = client

// go func() {
// time.Sleep(10 * time.Second)
// client.Close()
// }()

client.Recv("lala", nil, func(ch string, message nson.Message) {
fmt.Println(ch)
fmt.Println(message)
})
c.OnConnect(func() {
err = c.Attach("hello", nil, 0)
if err != nil {
log.Fatalln(err)
}

client.Llac("lala", nil, func(ch string, message nson.Message) nson.Message {
fmt.Println(ch)
fmt.Println(message)
for recv := range c.Recv() {
log.Println(recv)

return message
if back, ok := recv.Back(); ok {
c.Send(back, 0)
}
}
})

i := 0
for {
i++
fmt.Println(i)
err = client.Send("lala", nson.Message{"aaa": nson.String("bbb")}, nil, nil, 0)
log.Println(err)
}

var e = make(chan bool)
<-e
}
38 changes: 16 additions & 22 deletions cmd/send/main.go
Expand Up @@ -6,49 +6,43 @@ import (
"time"

"github.com/danclive/nson-go"

"github.com/danclive/queen-go/client"
"github.com/danclive/queen-go/conn"
"github.com/danclive/queen-go/crypto"
)

func main() {
config := conn.Config{
Addrs: []string{"danclive.com:8888"},
Addrs: []string{"snple.com:8888"},
EnableCrypto: true,
CryptoMethod: crypto.Aes128Gcm,
AccessKey: "fcbd6ea1e8c94dfc6b84405e",
SecretKey: "b14cd7bf94f0e3374e7fc4d4",
Debug: false,
}

client, err := client.NewClient(config)
c, err := client.NewClient(config)
if err != nil {
log.Fatalln(err)
}

_ = client
// go func() {
// time.Sleep(10 * time.Second)
// client.Close()
// }()

_ = c
for {
//time.Sleep(1 * time.Second)
time1 := time.Now()

err = client.Send("lala", nson.Message{"aaa": nson.String("bbb")}, nil, nil, time.Second*10)
log.Println(err)
msg := client.NewSendMessage("hello", nson.Message{"aaa": nson.String("bbb")}).WithCall(true)

msg, err := client.Call("lala", nson.Message{"aaa": nson.String("bbb")}, nil, nil, time.Second*10)
log.Println(err)
log.Println(msg)
log.Println(c.Send(msg, 0))

for {
now := time.Now()
msg, err := client.Call("ping", nson.Message{"aaa": nson.String("bbb")}, nil, nil, time.Second*10)
//log.Println(err)
//log.Println(msg)
_ = err
_ = msg
now2 := time.Now()

fmt.Println(now2.Sub(now))
time.Sleep(time.Second)
fmt.Println(time.Now().Sub(time1))
}

// var e = make(chan bool)
// <-e
var e = make(chan bool)
<-e
}
35 changes: 12 additions & 23 deletions conn/conn.go
Expand Up @@ -18,7 +18,7 @@ import (

type Config struct {
Addrs []string
ClientId nson.MessageId
SlotId nson.MessageId
EnableCrypto bool
CryptoMethod crypto.Method
AccessKey string
Expand All @@ -39,15 +39,16 @@ func (cfg *Config) init() {
cfg.Addrs = []string{"127.0.0.1:8888"}
}

if cfg.ClientId == nil {
cfg.ClientId = nson.NewMessageId()
if cfg.SlotId == nil {
cfg.SlotId = nson.NewMessageId()
}

if cfg.AuthMessage == nil {
cfg.AuthMessage = nson.Message{}
}

cfg.AuthMessage.Insert(CHAN, nson.String(AUTH))
cfg.AuthMessage.Insert(SLOT_ID, cfg.SlotId)

if cfg.EnableCrypto {
if cfg.CryptoMethod == crypto.None || cfg.AccessKey == "" || cfg.SecretKey == "" {
Expand Down Expand Up @@ -288,15 +289,17 @@ func (c *Conn) heartbeat() {
c.trace("heartbeat exit")
return
case <-time.After(c.config.HeartbeatInterval):
message := nson.Message{
CHAN: nson.String(PING),
}

c.SendMessage(message)

d := time.Now().Sub(c.LastRecvTime())
c.trace("heartbeat: %v", d)

if d > c.config.HeartbeatInterval {
message := nson.Message{
CHAN: nson.String(PING),
}

c.SendMessage(message)
}

if d > c.config.HeartbeatTimeout {
c.TryReconn()
}
Expand Down Expand Up @@ -494,20 +497,6 @@ func (c *Conn) SendMessage(msg nson.Message) (err error) {
c.writeMutex.Unlock()
}()

// base := c.base

// err = c._write(base, msg)
// if err == nil {
// c.writeCount += 1
// return
// }
// base.Close()

// go c.tryReconn(base)

// if c.waitReconn('w', c.writeWaitChan) {
// err = nil
// }
for {
if atomic.LoadInt32(&c.closed) != 0 {
return errors.New("conn closed")
Expand Down
9 changes: 9 additions & 0 deletions go.mod
@@ -0,0 +1,9 @@
module github.com/danclive/queen-go

go 1.15

require (
github.com/danclive/nson-go v0.3.0
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
)
21 changes: 21 additions & 0 deletions go.sum
@@ -0,0 +1,21 @@
github.com/danclive/nson-go v0.3.0 h1:skVgeJrbJsQk3HpVOAtZ71QMi+VQuhttgUVsDDMBDAE=
github.com/danclive/nson-go v0.3.0/go.mod h1:L1kLvlMsYMooindbfOBMV33U0S0chCfPyC5g2dimKAQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 comments on commit c44470e

Please sign in to comment.