Skip to content

Commit

Permalink
Fix message encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed May 15, 2018
1 parent e79b34c commit e388aee
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 20 deletions.
13 changes: 4 additions & 9 deletions azsqs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,6 @@ func (q *Queue) DeleteQueue() *memqueue.Queue {

// Add adds message to the queue.
func (q *Queue) Add(msg *msgqueue.Message) error {
_, err := internal.EncodeArgs(msg.Args, q.opt.Compress)
if err != nil {
return err
}

msg = msgutil.WrapMessage(msg)
return q.addQueue.Add(msg)
}
Expand Down Expand Up @@ -378,9 +373,9 @@ func (q *Queue) addBatch(msgs []*msgqueue.Message) error {
return err
}

body, err := internal.EncodeArgs(msg.Args, q.opt.Compress)
body, err := msg.EncodeBody(q.opt.Compress)
if err != nil {
internal.Logf("azsqs: EncodeArgs failed: %s", err)
internal.Logf("azsqs: EncodeBody failed: %s", err)
continue
}
if body == "" {
Expand Down Expand Up @@ -445,9 +440,9 @@ func (q *Queue) splitAddBatch(msgs []*msgqueue.Message) ([]*msgqueue.Message, []
continue
}

body, err := internal.EncodeArgs(msg.Args, q.opt.Compress)
body, err := msg.EncodeBody(q.opt.Compress)
if err != nil {
internal.Logf("azsqs: EncodeArgs failed: %s", err)
internal.Logf("azsqs: Message.EncodeBody failed: %s", err)
continue
}

Expand Down
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (h *reflectFunc) HandleMessage(msg *Message) error {
var compress bool
if body == "" {
var err error
body, err = internal.EncodeArgs(msg.Args, false)
body, err = msg.EncodeBody(false)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions ironmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,6 @@ func (q *Queue) createQueue() error {

// Add adds message to the queue.
func (q *Queue) Add(msg *msgqueue.Message) error {
_, err := internal.EncodeArgs(msg.Args, q.opt.Compress)
if err != nil {
return err
}

msg = msgutil.WrapMessage(msg)
return q.addQueue.Add(msg)
}
Expand Down Expand Up @@ -265,7 +260,7 @@ func (q *Queue) add(msg *msgqueue.Message) error {
return err
}

body, err := internal.EncodeArgs(msg.Args, q.opt.Compress)
body, err := msg.EncodeBody(q.opt.Compress)
if err != nil {
return err
}
Expand Down
21 changes: 17 additions & 4 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"hash/fnv"
"time"

"github.com/go-msgqueue/msgqueue/internal"
"github.com/vmihailenco/msgpack"
)

Expand Down Expand Up @@ -47,10 +48,8 @@ func NewMessage(args ...interface{}) *Message {
}

func (m *Message) String() string {
return fmt.Sprintf(
"Message<Id=%q Name=%q ReservedCount=%d>",
m.Id, m.Name, m.ReservedCount,
)
return fmt.Sprintf("Message<Id=%q Name=%q ReservedCount=%d>",
m.Id, m.Name, m.ReservedCount)
}

// SetDelayName sets delay and generates message name from the args.
Expand All @@ -60,6 +59,20 @@ func (m *Message) SetDelayName(delay time.Duration, args ...interface{}) {
m.Delay = delay
}

func (m *Message) EncodeBody(compress bool) (string, error) {
if m.Body != "" {
return m.Body, nil
}

s, err := internal.EncodeArgs(m.Args, compress)
if err != nil {
return "", err
}

m.Body = s
return s, nil
}

func timeSlot(period time.Duration) int64 {
if period <= 0 {
return 0
Expand Down

0 comments on commit e388aee

Please sign in to comment.