Skip to content

Commit

Permalink
Merge branch 'master' into feature/pn-301-correlation-id
Browse files Browse the repository at this point in the history
  • Loading branch information
cosminrentea committed Mar 21, 2017
2 parents 65524a8 + a74bed7 commit 3688c63
Show file tree
Hide file tree
Showing 53 changed files with 535 additions and 955 deletions.
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
)

var aNormalMessage = `/foo/bar,42,user01,phone01,{},1420110000,0
var aNormalMessage = `/foo/bar,42,user01,phone01,{},,1420110000,0
Hello World`

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestReceiveAMessage(t *testing.T) {
// than we receive the expected message
select {
case m := <-c.Messages():
a.Equal(aNormalMessage, string(m.Bytes()))
a.Equal(aNormalMessage, string(m.Encode()))
case <-time.After(time.Millisecond * 10):
a.Fail("timeout while waiting for message")
}
Expand Down
4 changes: 2 additions & 2 deletions guble-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func readLoop(client client.Client) {
select {
case incomingMessage := <-client.Messages():
if *verbose {
fmt.Println(string(incomingMessage.Bytes()))
fmt.Println(string(incomingMessage.Encode()))
} else {
fmt.Printf("%v: %v\n", incomingMessage.UserID, incomingMessage.BodyAsString())
fmt.Printf("%v: %v\n", incomingMessage.UserID, string(incomingMessage.Body))
}
case e := <-client.Errors():
fmt.Println("ERROR: " + string(e.Bytes()))
Expand Down
209 changes: 85 additions & 124 deletions protocol/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

log "github.com/Sirupsen/logrus"
)
Expand All @@ -30,6 +31,13 @@ type Message struct {
// routes that match the filters
Filters map[string]string

// Expires field specifies until when the message is valid to be processed
// If this field is set and the message is expired the connectors should
// consider the message as processed and log the action
//
// RFC3339 format
Expires *time.Time

// The time of publishing, as Unix Timestamp date
Time int64

Expand All @@ -46,138 +54,105 @@ type Message struct {
type MessageDeliveryCallback func(*Message)

// Metadata returns the first line of a serialized message, without the newline
func (msg *Message) Metadata() string {
func (m *Message) Metadata() string {
buff := &bytes.Buffer{}
msg.writeMetadata(buff)
m.writeMetadata(buff)
return string(buff.Bytes())
}

func (msg *Message) String() string {
return fmt.Sprintf("%d", msg.ID)
}

func (msg *Message) BodyAsString() string {
return string(msg.Body)
func (m *Message) String() string {
return fmt.Sprintf("%d: %s", m.ID, string(m.Body))
}

// Bytes serializes the message into a byte slice
func (msg *Message) Bytes() []byte {
// Encode serializes the message into a byte slice
func (m *Message) Encode() []byte {
buff := &bytes.Buffer{}

msg.writeMetadata(buff)
m.writeMetadata(buff)

if len(msg.HeaderJSON) > 0 || len(msg.Body) > 0 {
if len(m.HeaderJSON) > 0 || len(m.Body) > 0 {
buff.WriteString("\n")
}

if len(msg.HeaderJSON) > 0 {
buff.WriteString(msg.HeaderJSON)
if len(m.HeaderJSON) > 0 {
buff.WriteString(m.HeaderJSON)
}

if len(msg.Body) > 0 {
if len(m.Body) > 0 {
buff.WriteString("\n")
buff.Write(msg.Body)
buff.Write(m.Body)
}

return buff.Bytes()
}

func (msg *Message) writeMetadata(buff *bytes.Buffer) {
buff.WriteString(string(msg.Path))
buff.WriteString(",")
buff.WriteString(strconv.FormatUint(msg.ID, 10))
buff.WriteString(",")
buff.WriteString(msg.UserID)
buff.WriteString(",")
buff.WriteString(msg.ApplicationID)
buff.WriteString(",")
buff.Write(msg.encodeFilters())
buff.WriteString(",")
buff.WriteString(strconv.FormatInt(msg.Time, 10))
buff.WriteString(",")
buff.WriteString(strconv.FormatUint(uint64(msg.NodeID), 10))
func (m *Message) writeMetadata(buff *bytes.Buffer) {
buff.WriteString(string(m.Path))
buff.WriteByte(',')

buff.WriteString(strconv.FormatUint(m.ID, 10))
buff.WriteByte(',')

buff.WriteString(m.UserID)
buff.WriteByte(',')

buff.WriteString(m.ApplicationID)
buff.WriteByte(',')

buff.Write(m.encodeFilters())
buff.WriteByte(',')

if m.Expires != nil {
buff.WriteString(m.Expires.Format(time.RFC3339))
}
buff.WriteByte(',')

buff.WriteString(strconv.FormatInt(m.Time, 10))
buff.WriteByte(',')

buff.WriteString(strconv.FormatUint(uint64(m.NodeID), 10))
}

func (msg *Message) encodeFilters() []byte {
if msg.Filters == nil {
func (m *Message) encodeFilters() []byte {
if m.Filters == nil {
return []byte{}
}
data, err := json.Marshal(msg.Filters)
data, err := json.Marshal(m.Filters)
if err != nil {
log.WithError(err).WithField("filters", msg.Filters).Error("Error encoding filters")
log.WithError(err).WithField("filters", m.Filters).Error("Error encoding filters")
return []byte{}
}
return data
}

func (msg *Message) decodeFilters(data []byte) {
func (m *Message) decodeFilters(data []byte) {
if len(data) == 0 {
return
}
msg.Filters = make(map[string]string)
err := json.Unmarshal(data, &msg.Filters)
m.Filters = make(map[string]string)
err := json.Unmarshal(data, &m.Filters)
if err != nil {
log.WithError(err).WithField("data", string(data)).Error("Error decoding filters")
}
}

func (msg *Message) SetFilter(key, value string) {
if msg.Filters == nil {
msg.Filters = make(map[string]string, 1)
}
msg.Filters[key] = value
}

// Valid constants for the NotificationMessage.Name
const (
SUCCESS_CONNECTED = "connected"
SUCCESS_SEND = "send"
SUCCESS_FETCH_START = "fetch-start"
SUCCESS_FETCH_END = "fetch-end"
SUCCESS_SUBSCRIBED_TO = "subscribed-to"
SUCCESS_CANCELED = "canceled"
ERROR_SUBSCRIBED_TO = "error-subscribed-to"
ERROR_BAD_REQUEST = "error-bad-request"
ERROR_INTERNAL_SERVER = "error-server-internal"
)

// NotificationMessage is a representation of a status messages or error message, sent from the server
type NotificationMessage struct {

// The name of the message
Name string

// The argument line, following the messageName
Arg string

// The optional json data supplied with the message
Json string

// Flag which indicates, if the notification is an error
IsError bool
}

// Bytes serializes the notification message into a byte slice
func (msg *NotificationMessage) Bytes() []byte {
buff := &bytes.Buffer{}

if msg.IsError {
buff.WriteString("!")
} else {
buff.WriteString("#")
}
buff.WriteString(msg.Name)
if len(msg.Arg) > 0 {
buff.WriteString(" ")
buff.WriteString(msg.Arg)
func (m *Message) SetFilter(key, value string) {
if m.Filters == nil {
m.Filters = make(map[string]string, 1)
}
m.Filters[key] = value
}

if len(msg.Json) > 0 {
buff.WriteString("\n")
buff.WriteString(msg.Json)
// IsExpired returns true if the message `Expires` field is set and the current time
// has pasted the `Expires` time
//
// Checks are made using `Expires` field timezone
func (m *Message) IsExpired() bool {
if m.Expires == nil {
return true
}

return buff.Bytes()
return m.Expires != nil && m.Expires.Before(time.Now().In(m.Expires.Location()))
}

// Decode decodes a message, sent from the server to the client.
Expand All @@ -197,7 +172,7 @@ func ParseMessage(message []byte) (*Message, error) {

meta := strings.Split(parts[0], ",")

if len(meta) != 7 {
if len(meta) != 8 {
return nil, fmt.Errorf("message metadata has to have 7 fields, but was %v", parts[0])
}

Expand All @@ -210,57 +185,43 @@ func ParseMessage(message []byte) (*Message, error) {
return nil, fmt.Errorf("message metadata to have an integer (message-id) as second field, but was %v", meta[1])
}

publishingTime, err := strconv.ParseInt(meta[5], 10, 64)
var expiresTime *time.Time
if meta[5] != "" {
if t, err := time.Parse(time.RFC3339, meta[5]); err != nil {
return nil, fmt.Errorf("message metadata expected to have a time string (expiration time) as sixth field, but was %v: %s", meta[5], err.Error())
} else {
expiresTime = &t
}
}

publishingTime, err := strconv.ParseInt(meta[6], 10, 64)
if err != nil {
return nil, fmt.Errorf("message metadata to have an integer (publishing time) as sixth field, but was %v", meta[5])
return nil, fmt.Errorf("message metadata to have an integer (publishing time) as seventh field, but was %v:", meta[6])
}

nodeID, err := strconv.ParseUint(meta[6], 10, 8)
nodeID, err := strconv.ParseUint(meta[7], 10, 8)
if err != nil {
return nil, fmt.Errorf("message metadata to have an integer (nodeID) as seventh field, but was %v", meta[6])
return nil, fmt.Errorf("message metadata to have an integer (nodeID) as eighth field, but was %v", meta[7])
}

msg := &Message{
m := &Message{
ID: id,
Path: Path(meta[0]),
UserID: meta[2],
ApplicationID: meta[3],
Expires: expiresTime,
Time: publishingTime,
NodeID: uint8(nodeID),
}
msg.decodeFilters([]byte(meta[4]))
m.decodeFilters([]byte(meta[4]))

if len(parts) >= 2 {
msg.HeaderJSON = parts[1]
m.HeaderJSON = parts[1]
}

if len(parts) == 3 {
msg.Body = []byte(parts[2])
}

return msg, nil
}

func parseNotificationMessage(message []byte) (*NotificationMessage, error) {
msg := &NotificationMessage{}

if len(message) < 2 || (message[0] != '#' && message[0] != '!') {
return nil, fmt.Errorf("message has to start with '#' or '!' and a name, but got '%v'", message)
}
msg.IsError = message[0] == '!'

parts := strings.SplitN(string(message)[1:], "\n", 2)
firstLine := strings.SplitN(parts[0], " ", 2)

msg.Name = firstLine[0]

if len(firstLine) > 1 {
msg.Arg = firstLine[1]
}

if len(parts) > 1 {
msg.Json = parts[1]
m.Body = []byte(parts[2])
}

return msg, nil
return m, nil
}
Loading

0 comments on commit 3688c63

Please sign in to comment.