Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pn 310 ttl for messages #7

Merged
merged 16 commits into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
)

var aNormalMessage = `/foo/bar,42,user01,phone01,{},1420110000,0
var aNormalMessage = `/foo/bar,42,user01,phone01,{},,1420110000,0

Hello World`

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestReceiveAMessage(t *testing.T) {
// than we receive the expected message
select {
case m := <-c.Messages():
a.Equal(aNormalMessage, string(m.Bytes()))
a.Equal(aNormalMessage, string(m.Encode()))
case <-time.After(time.Millisecond * 10):
a.Fail("timeout while waiting for message")
}
Expand Down
4 changes: 2 additions & 2 deletions guble-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func readLoop(client client.Client) {
select {
case incomingMessage := <-client.Messages():
if *verbose {
fmt.Println(string(incomingMessage.Bytes()))
fmt.Println(string(incomingMessage.Encode()))
} else {
fmt.Printf("%v: %v\n", incomingMessage.UserID, incomingMessage.BodyAsString())
fmt.Printf("%v: %v\n", incomingMessage.UserID, string(incomingMessage.Body))
}
case e := <-client.Errors():
fmt.Println("ERROR: " + string(e.Bytes()))
Expand Down
209 changes: 85 additions & 124 deletions protocol/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

log "github.com/Sirupsen/logrus"
)
Expand All @@ -30,6 +31,13 @@ type Message struct {
// routes that match the filters
Filters map[string]string

// Expires field specifies until when the message is valid to be processed
// If this field is set and the message is expired the connectors should
// consider the message as processed and log the action
//
// RFC3339 format
Expires *time.Time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you using RFC3339 for Expires, but a unix timestamp for Time below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We require the message to also contain timezone information.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for me asking, but why are you requiring the timezone information? Wouldn't it suffice to know the exact time the message expires? Which is given by a unix timestamp (since it is in UTC).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be correct, but we had some issues with timezone information checking on other project and would like for the producer to be able to fully control without any uncertainty.


// The time of publishing, as Unix Timestamp date
Time int64

Expand All @@ -46,138 +54,105 @@ type Message struct {
type MessageDeliveryCallback func(*Message)

// Metadata returns the first line of a serialized message, without the newline
func (msg *Message) Metadata() string {
func (m *Message) Metadata() string {
buff := &bytes.Buffer{}
msg.writeMetadata(buff)
m.writeMetadata(buff)
return string(buff.Bytes())
}

func (msg *Message) String() string {
return fmt.Sprintf("%d", msg.ID)
}

func (msg *Message) BodyAsString() string {
return string(msg.Body)
func (m *Message) String() string {
return fmt.Sprintf("%d: %s", m.ID, string(m.Body))
}

// Bytes serializes the message into a byte slice
func (msg *Message) Bytes() []byte {
// Encode serializes the message into a byte slice
func (m *Message) Encode() []byte {
buff := &bytes.Buffer{}

msg.writeMetadata(buff)
m.writeMetadata(buff)

if len(msg.HeaderJSON) > 0 || len(msg.Body) > 0 {
if len(m.HeaderJSON) > 0 || len(m.Body) > 0 {
buff.WriteString("\n")
}

if len(msg.HeaderJSON) > 0 {
buff.WriteString(msg.HeaderJSON)
if len(m.HeaderJSON) > 0 {
buff.WriteString(m.HeaderJSON)
}

if len(msg.Body) > 0 {
if len(m.Body) > 0 {
buff.WriteString("\n")
buff.Write(msg.Body)
buff.Write(m.Body)
}

return buff.Bytes()
}

func (msg *Message) writeMetadata(buff *bytes.Buffer) {
buff.WriteString(string(msg.Path))
buff.WriteString(",")
buff.WriteString(strconv.FormatUint(msg.ID, 10))
buff.WriteString(",")
buff.WriteString(msg.UserID)
buff.WriteString(",")
buff.WriteString(msg.ApplicationID)
buff.WriteString(",")
buff.Write(msg.encodeFilters())
buff.WriteString(",")
buff.WriteString(strconv.FormatInt(msg.Time, 10))
buff.WriteString(",")
buff.WriteString(strconv.FormatUint(uint64(msg.NodeID), 10))
func (m *Message) writeMetadata(buff *bytes.Buffer) {
buff.WriteString(string(m.Path))
buff.WriteByte(',')

buff.WriteString(strconv.FormatUint(m.ID, 10))
buff.WriteByte(',')

buff.WriteString(m.UserID)
buff.WriteByte(',')

buff.WriteString(m.ApplicationID)
buff.WriteByte(',')

buff.Write(m.encodeFilters())
buff.WriteByte(',')

if m.Expires != nil {
buff.WriteString(m.Expires.Format(time.RFC3339))
}
buff.WriteByte(',')

buff.WriteString(strconv.FormatInt(m.Time, 10))
buff.WriteByte(',')

buff.WriteString(strconv.FormatUint(uint64(m.NodeID), 10))
}

func (msg *Message) encodeFilters() []byte {
if msg.Filters == nil {
func (m *Message) encodeFilters() []byte {
if m.Filters == nil {
return []byte{}
}
data, err := json.Marshal(msg.Filters)
data, err := json.Marshal(m.Filters)
if err != nil {
log.WithError(err).WithField("filters", msg.Filters).Error("Error encoding filters")
log.WithError(err).WithField("filters", m.Filters).Error("Error encoding filters")
return []byte{}
}
return data
}

func (msg *Message) decodeFilters(data []byte) {
func (m *Message) decodeFilters(data []byte) {
if len(data) == 0 {
return
}
msg.Filters = make(map[string]string)
err := json.Unmarshal(data, &msg.Filters)
m.Filters = make(map[string]string)
err := json.Unmarshal(data, &m.Filters)
if err != nil {
log.WithError(err).WithField("data", string(data)).Error("Error decoding filters")
}
}

func (msg *Message) SetFilter(key, value string) {
if msg.Filters == nil {
msg.Filters = make(map[string]string, 1)
}
msg.Filters[key] = value
}

// Valid constants for the NotificationMessage.Name
const (
SUCCESS_CONNECTED = "connected"
SUCCESS_SEND = "send"
SUCCESS_FETCH_START = "fetch-start"
SUCCESS_FETCH_END = "fetch-end"
SUCCESS_SUBSCRIBED_TO = "subscribed-to"
SUCCESS_CANCELED = "canceled"
ERROR_SUBSCRIBED_TO = "error-subscribed-to"
ERROR_BAD_REQUEST = "error-bad-request"
ERROR_INTERNAL_SERVER = "error-server-internal"
)

// NotificationMessage is a representation of a status messages or error message, sent from the server
type NotificationMessage struct {

// The name of the message
Name string

// The argument line, following the messageName
Arg string

// The optional json data supplied with the message
Json string

// Flag which indicates, if the notification is an error
IsError bool
}

// Bytes serializes the notification message into a byte slice
func (msg *NotificationMessage) Bytes() []byte {
buff := &bytes.Buffer{}

if msg.IsError {
buff.WriteString("!")
} else {
buff.WriteString("#")
}
buff.WriteString(msg.Name)
if len(msg.Arg) > 0 {
buff.WriteString(" ")
buff.WriteString(msg.Arg)
func (m *Message) SetFilter(key, value string) {
if m.Filters == nil {
m.Filters = make(map[string]string, 1)
}
m.Filters[key] = value
}

if len(msg.Json) > 0 {
buff.WriteString("\n")
buff.WriteString(msg.Json)
// IsExpired returns true if the message `Expires` field is set and the current time
// has pasted the `Expires` time
//
// Checks are made using `Expires` field timezone
func (m *Message) IsExpired() bool {
if m.Expires == nil {
return true
}

return buff.Bytes()
return m.Expires != nil && m.Expires.Before(time.Now().In(m.Expires.Location()))
}

// Decode decodes a message, sent from the server to the client.
Expand All @@ -197,7 +172,7 @@ func ParseMessage(message []byte) (*Message, error) {

meta := strings.Split(parts[0], ",")

if len(meta) != 7 {
if len(meta) != 8 {
return nil, fmt.Errorf("message metadata has to have 7 fields, but was %v", parts[0])
}

Expand All @@ -210,57 +185,43 @@ func ParseMessage(message []byte) (*Message, error) {
return nil, fmt.Errorf("message metadata to have an integer (message-id) as second field, but was %v", meta[1])
}

publishingTime, err := strconv.ParseInt(meta[5], 10, 64)
var expiresTime *time.Time
if meta[5] != "" {
if t, err := time.Parse(time.RFC3339, meta[5]); err != nil {
return nil, fmt.Errorf("message metadata expected to have a time string (expiration time) as sixth field, but was %v: %s", meta[5], err.Error())
} else {
expiresTime = &t
}
}

publishingTime, err := strconv.ParseInt(meta[6], 10, 64)
if err != nil {
return nil, fmt.Errorf("message metadata to have an integer (publishing time) as sixth field, but was %v", meta[5])
return nil, fmt.Errorf("message metadata to have an integer (publishing time) as seventh field, but was %v:", meta[6])
}

nodeID, err := strconv.ParseUint(meta[6], 10, 8)
nodeID, err := strconv.ParseUint(meta[7], 10, 8)
if err != nil {
return nil, fmt.Errorf("message metadata to have an integer (nodeID) as seventh field, but was %v", meta[6])
return nil, fmt.Errorf("message metadata to have an integer (nodeID) as eighth field, but was %v", meta[7])
}

msg := &Message{
m := &Message{
ID: id,
Path: Path(meta[0]),
UserID: meta[2],
ApplicationID: meta[3],
Expires: expiresTime,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field should be ExpirationTime not Expires.

Time: publishingTime,
NodeID: uint8(nodeID),
}
msg.decodeFilters([]byte(meta[4]))
m.decodeFilters([]byte(meta[4]))

if len(parts) >= 2 {
msg.HeaderJSON = parts[1]
m.HeaderJSON = parts[1]
}

if len(parts) == 3 {
msg.Body = []byte(parts[2])
}

return msg, nil
}

func parseNotificationMessage(message []byte) (*NotificationMessage, error) {
msg := &NotificationMessage{}

if len(message) < 2 || (message[0] != '#' && message[0] != '!') {
return nil, fmt.Errorf("message has to start with '#' or '!' and a name, but got '%v'", message)
}
msg.IsError = message[0] == '!'

parts := strings.SplitN(string(message)[1:], "\n", 2)
firstLine := strings.SplitN(parts[0], " ", 2)

msg.Name = firstLine[0]

if len(firstLine) > 1 {
msg.Arg = firstLine[1]
}

if len(parts) > 1 {
msg.Json = parts[1]
m.Body = []byte(parts[2])
}

return msg, nil
return m, nil
}
Loading