From 46737b838d749a20a2307f22e5eafce6115467ea Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Thu, 16 Mar 2017 11:24:28 +0200 Subject: [PATCH 01/11] Add Expires field --- protocol/message.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/protocol/message.go b/protocol/message.go index 38dbb6df..3652643a 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "time" log "github.com/Sirupsen/logrus" ) @@ -30,6 +31,13 @@ type Message struct { // routes that match the filters Filters map[string]string + // Expires field specifies until when the message is valid to be processed + // If this field is set and the message is expired the connectors should + // consider the message as processed and log the action + // + // RFC3339 format + Expires *time.Time + // The time of publishing, as Unix Timestamp date Time int64 From aaf180223bb428bc8e913ebc32a8ad338e7f3dfa Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 11:06:34 +0200 Subject: [PATCH 02/11] Split notification message struct in separate file --- protocol/message.go | 76 ------------------------ protocol/message_test.go | 41 ------------- protocol/notification_message.go | 83 +++++++++++++++++++++++++++ protocol/notification_message_test.go | 48 ++++++++++++++++ 4 files changed, 131 insertions(+), 117 deletions(-) create mode 100644 protocol/notification_message.go create mode 100644 protocol/notification_message_test.go diff --git a/protocol/message.go b/protocol/message.go index 3652643a..3b56a1e7 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -136,58 +136,6 @@ func (msg *Message) SetFilter(key, value string) { msg.Filters[key] = value } -// Valid constants for the NotificationMessage.Name -const ( - SUCCESS_CONNECTED = "connected" - SUCCESS_SEND = "send" - SUCCESS_FETCH_START = "fetch-start" - SUCCESS_FETCH_END = "fetch-end" - SUCCESS_SUBSCRIBED_TO = "subscribed-to" - SUCCESS_CANCELED = "canceled" - ERROR_SUBSCRIBED_TO = "error-subscribed-to" - ERROR_BAD_REQUEST = "error-bad-request" - ERROR_INTERNAL_SERVER = "error-server-internal" -) - -// NotificationMessage is a representation of a status messages or error message, sent from the server -type NotificationMessage struct { - - // The name of the message - Name string - - // The argument line, following the messageName - Arg string - - // The optional json data supplied with the message - Json string - - // Flag which indicates, if the notification is an error - IsError bool -} - -// Bytes serializes the notification message into a byte slice -func (msg *NotificationMessage) Bytes() []byte { - buff := &bytes.Buffer{} - - if msg.IsError { - buff.WriteString("!") - } else { - buff.WriteString("#") - } - buff.WriteString(msg.Name) - if len(msg.Arg) > 0 { - buff.WriteString(" ") - buff.WriteString(msg.Arg) - } - - if len(msg.Json) > 0 { - buff.WriteString("\n") - buff.WriteString(msg.Json) - } - - return buff.Bytes() -} - // Decode decodes a message, sent from the server to the client. // The decoded messages can have one of the types: *Message or *NotificationMessage func Decode(message []byte) (interface{}, error) { @@ -248,27 +196,3 @@ func ParseMessage(message []byte) (*Message, error) { return msg, nil } - -func parseNotificationMessage(message []byte) (*NotificationMessage, error) { - msg := &NotificationMessage{} - - if len(message) < 2 || (message[0] != '#' && message[0] != '!') { - return nil, fmt.Errorf("message has to start with '#' or '!' and a name, but got '%v'", message) - } - msg.IsError = message[0] == '!' - - parts := strings.SplitN(string(message)[1:], "\n", 2) - firstLine := strings.SplitN(parts[0], " ", 2) - - msg.Name = firstLine[0] - - if len(firstLine) > 1 { - msg.Arg = firstLine[1] - } - - if len(parts) > 1 { - msg.Json = parts[1] - } - - return msg, nil -} diff --git a/protocol/message_test.go b/protocol/message_test.go index c5900aed..1b6556cb 100644 --- a/protocol/message_test.go +++ b/protocol/message_test.go @@ -126,31 +126,6 @@ func TestErrorsOnParsingMessages(t *testing.T) { assert.Error(err) } -func TestParsingNotificationMessage(t *testing.T) { - assert := assert.New(t) - - msgI, err := Decode([]byte(aConnectedNotification)) - assert.NoError(err) - assert.IsType(&NotificationMessage{}, msgI) - msg := msgI.(*NotificationMessage) - - assert.Equal(SUCCESS_CONNECTED, msg.Name) - assert.Equal("You are connected to the server.", msg.Arg) - assert.Equal(`{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, msg.Json) - assert.Equal(false, msg.IsError) -} - -func TestSerializeANotificationMessage(t *testing.T) { - msg := &NotificationMessage{ - Name: SUCCESS_CONNECTED, - Arg: "You are connected to the server.", - Json: `{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, - IsError: false, - } - - assert.Equal(t, aConnectedNotification, string(msg.Bytes())) -} - func TestSerializeAnErrorMessage(t *testing.T) { msg := &NotificationMessage{ Name: ERROR_BAD_REQUEST, @@ -171,22 +146,6 @@ func TestSerializeANotificationMessageWithEmptyArg(t *testing.T) { assert.Equal(t, "#"+SUCCESS_SEND, string(msg.Bytes())) } -func TestParsingErrorNotificationMessage(t *testing.T) { - assert := assert.New(t) - - raw := "!bad-request unknown command 'sdcsd'" - - msgI, err := Decode([]byte(raw)) - assert.NoError(err) - assert.IsType(&NotificationMessage{}, msgI) - msg := msgI.(*NotificationMessage) - - assert.Equal("bad-request", msg.Name) - assert.Equal("unknown command 'sdcsd'", msg.Arg) - assert.Equal("", msg.Json) - assert.Equal(true, msg.IsError) -} - func Test_Message_getPartitionFromTopic(t *testing.T) { a := assert.New(t) a.Equal("foo", Path("/foo/bar/bazz").Partition()) diff --git a/protocol/notification_message.go b/protocol/notification_message.go new file mode 100644 index 00000000..a5342b34 --- /dev/null +++ b/protocol/notification_message.go @@ -0,0 +1,83 @@ +package protocol + +import ( + "bytes" + "fmt" + "strings" +) + +// Valid constants for the NotificationMessage.Name +const ( + SUCCESS_CONNECTED = "connected" + SUCCESS_SEND = "send" + SUCCESS_FETCH_START = "fetch-start" + SUCCESS_FETCH_END = "fetch-end" + SUCCESS_SUBSCRIBED_TO = "subscribed-to" + SUCCESS_CANCELED = "canceled" + ERROR_SUBSCRIBED_TO = "error-subscribed-to" + ERROR_BAD_REQUEST = "error-bad-request" + ERROR_INTERNAL_SERVER = "error-server-internal" +) + +// NotificationMessage is a representation of a status messages or error message, sent from the server +type NotificationMessage struct { + + // The name of the message + Name string + + // The argument line, following the messageName + Arg string + + // The optional json data supplied with the message + Json string + + // Flag which indicates, if the notification is an error + IsError bool +} + +// Bytes serializes the notification message into a byte slice +func (msg *NotificationMessage) Bytes() []byte { + buff := &bytes.Buffer{} + + if msg.IsError { + buff.WriteString("!") + } else { + buff.WriteString("#") + } + buff.WriteString(msg.Name) + if len(msg.Arg) > 0 { + buff.WriteString(" ") + buff.WriteString(msg.Arg) + } + + if len(msg.Json) > 0 { + buff.WriteString("\n") + buff.WriteString(msg.Json) + } + + return buff.Bytes() +} + +func parseNotificationMessage(message []byte) (*NotificationMessage, error) { + msg := &NotificationMessage{} + + if len(message) < 2 || (message[0] != '#' && message[0] != '!') { + return nil, fmt.Errorf("message has to start with '#' or '!' and a name, but got '%v'", message) + } + msg.IsError = message[0] == '!' + + parts := strings.SplitN(string(message)[1:], "\n", 2) + firstLine := strings.SplitN(parts[0], " ", 2) + + msg.Name = firstLine[0] + + if len(firstLine) > 1 { + msg.Arg = firstLine[1] + } + + if len(parts) > 1 { + msg.Json = parts[1] + } + + return msg, nil +} diff --git a/protocol/notification_message_test.go b/protocol/notification_message_test.go new file mode 100644 index 00000000..9eb53505 --- /dev/null +++ b/protocol/notification_message_test.go @@ -0,0 +1,48 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParsingNotificationMessage(t *testing.T) { + assert := assert.New(t) + + msgI, err := Decode([]byte(aConnectedNotification)) + assert.NoError(err) + assert.IsType(&NotificationMessage{}, msgI) + msg := msgI.(*NotificationMessage) + + assert.Equal(SUCCESS_CONNECTED, msg.Name) + assert.Equal("You are connected to the server.", msg.Arg) + assert.Equal(`{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, msg.Json) + assert.Equal(false, msg.IsError) +} + +func TestSerializeANotificationMessage(t *testing.T) { + msg := &NotificationMessage{ + Name: SUCCESS_CONNECTED, + Arg: "You are connected to the server.", + Json: `{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, + IsError: false, + } + + assert.Equal(t, aConnectedNotification, string(msg.Bytes())) +} + +func TestParsingErrorNotificationMessage(t *testing.T) { + assert := assert.New(t) + + raw := "!bad-request unknown command 'sdcsd'" + + msgI, err := Decode([]byte(raw)) + assert.NoError(err) + assert.IsType(&NotificationMessage{}, msgI) + msg := msgI.(*NotificationMessage) + + assert.Equal("bad-request", msg.Name) + assert.Equal("unknown command 'sdcsd'", msg.Arg) + assert.Equal("", msg.Json) + assert.Equal(true, msg.IsError) +} From ec1c5d1cc5404d6f8a055678fe08d138442f492c Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 11:31:38 +0200 Subject: [PATCH 03/11] Expires field with tests decoding and serializing --- protocol/message.go | 43 ++++++++++++++----- protocol/message_test.go | 62 ++++++++++++++++----------- protocol/notification_message_test.go | 20 +++++++++ 3 files changed, 89 insertions(+), 36 deletions(-) diff --git a/protocol/message.go b/protocol/message.go index 3b56a1e7..a23b3c53 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -92,17 +92,28 @@ func (msg *Message) Bytes() []byte { func (msg *Message) writeMetadata(buff *bytes.Buffer) { buff.WriteString(string(msg.Path)) - buff.WriteString(",") + buff.WriteByte(',') + buff.WriteString(strconv.FormatUint(msg.ID, 10)) - buff.WriteString(",") + buff.WriteByte(',') + buff.WriteString(msg.UserID) - buff.WriteString(",") + buff.WriteByte(',') + buff.WriteString(msg.ApplicationID) - buff.WriteString(",") + buff.WriteByte(',') + buff.Write(msg.encodeFilters()) - buff.WriteString(",") + buff.WriteByte(',') + + if msg.Expires != nil { + buff.WriteString(msg.Expires.Format(time.RFC3339)) + } + buff.WriteByte(',') + buff.WriteString(strconv.FormatInt(msg.Time, 10)) - buff.WriteString(",") + buff.WriteByte(',') + buff.WriteString(strconv.FormatUint(uint64(msg.NodeID), 10)) } @@ -153,7 +164,7 @@ func ParseMessage(message []byte) (*Message, error) { meta := strings.Split(parts[0], ",") - if len(meta) != 7 { + if len(meta) != 8 { return nil, fmt.Errorf("message metadata has to have 7 fields, but was %v", parts[0]) } @@ -166,14 +177,23 @@ func ParseMessage(message []byte) (*Message, error) { return nil, fmt.Errorf("message metadata to have an integer (message-id) as second field, but was %v", meta[1]) } - publishingTime, err := strconv.ParseInt(meta[5], 10, 64) + var expiresTime *time.Time + if meta[5] != "" { + if t, err := time.Parse(time.RFC3339, meta[5]); err != nil { + return nil, fmt.Errorf("message metadata expected to have a time string (expiration time) as sixth field, but was %v: %s", meta[5], err.Error()) + } else { + expiresTime = &t + } + } + + publishingTime, err := strconv.ParseInt(meta[6], 10, 64) if err != nil { - return nil, fmt.Errorf("message metadata to have an integer (publishing time) as sixth field, but was %v", meta[5]) + return nil, fmt.Errorf("message metadata to have an integer (publishing time) as seventh field, but was %v:", meta[6]) } - nodeID, err := strconv.ParseUint(meta[6], 10, 8) + nodeID, err := strconv.ParseUint(meta[7], 10, 8) if err != nil { - return nil, fmt.Errorf("message metadata to have an integer (nodeID) as seventh field, but was %v", meta[6]) + return nil, fmt.Errorf("message metadata to have an integer (nodeID) as eighth field, but was %v", meta[7]) } msg := &Message{ @@ -181,6 +201,7 @@ func ParseMessage(message []byte) (*Message, error) { Path: Path(meta[0]), UserID: meta[2], ApplicationID: meta[3], + Expires: expiresTime, Time: publishingTime, NodeID: uint8(nodeID), } diff --git a/protocol/message_test.go b/protocol/message_test.go index 1b6556cb..dc999644 100644 --- a/protocol/message_test.go +++ b/protocol/message_test.go @@ -9,17 +9,23 @@ import ( "github.com/stretchr/testify/assert" ) -var aNormalMessage = `/foo/bar,42,user01,phone01,{"user":"user01"},1420110000,1 +var ( + aNormalMessage = `/foo/bar,42,user01,phone01,{"user":"user01"},2017-03-17T20:04:26+02:00,1420110000,1 {"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"} Hello World` -var aMinimalMessage = "/,42,,,,1420110000,0" + aNormalMessageNoExpires = `/foo/bar,42,user01,phone01,{"user":"user01"},,1420110000,1 +{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"} +Hello World` -var aConnectedNotification = `#connected You are connected to the server. + aMinimalMessage = "/,42,,,,,1420110000,0" + + aConnectedNotification = `#connected You are connected to the server. {"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}` -// 2015-01-01T12:00:00+01:00 is equal to 1420110000 -var unixTime, _ = time.Parse(time.RFC3339, "2015-01-01T12:00:00+01:00") + // 2015-01-01T12:00:00+01:00 is equal to 1420110000 + unixTime, _ = time.Parse(time.RFC3339, "2015-01-01T12:00:00+01:00") +) func TestParsingANormalMessage(t *testing.T) { assert := assert.New(t) @@ -54,6 +60,32 @@ func TestSerializeANormalMessage(t *testing.T) { Body: []byte("Hello World"), } + // then: the serialisation is as expected + assert.Equal(t, aNormalMessageNoExpires, string(msg.Bytes())) + assert.Equal(t, "Hello World", msg.BodyAsString()) + + // and: the first line is as expected + assert.Equal(t, strings.SplitN(aNormalMessageNoExpires, "\n", 2)[0], msg.Metadata()) +} + +func TestSerializeANormalMessageWithExpires(t *testing.T) { + // given: a message + msg := &Message{ + ID: uint64(42), + Path: Path("/foo/bar"), + UserID: "user01", + ApplicationID: "phone01", + Filters: map[string]string{"user": "user01"}, + Time: unixTime.Unix(), + NodeID: 1, + HeaderJSON: `{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}`, + Body: []byte("Hello World"), + } + + expire, err := time.Parse(time.RFC3339, "2017-03-17T20:04:26+02:00") + assert.NoError(t, err) + msg.Expires = &expire + // then: the serialisation is as expected assert.Equal(t, aNormalMessage, string(msg.Bytes())) assert.Equal(t, "Hello World", msg.BodyAsString()) @@ -126,26 +158,6 @@ func TestErrorsOnParsingMessages(t *testing.T) { assert.Error(err) } -func TestSerializeAnErrorMessage(t *testing.T) { - msg := &NotificationMessage{ - Name: ERROR_BAD_REQUEST, - Arg: "you are so bad.", - IsError: true, - } - - assert.Equal(t, "!"+ERROR_BAD_REQUEST+" "+"you are so bad.", string(msg.Bytes())) -} - -func TestSerializeANotificationMessageWithEmptyArg(t *testing.T) { - msg := &NotificationMessage{ - Name: SUCCESS_SEND, - Arg: "", - IsError: false, - } - - assert.Equal(t, "#"+SUCCESS_SEND, string(msg.Bytes())) -} - func Test_Message_getPartitionFromTopic(t *testing.T) { a := assert.New(t) a.Equal("foo", Path("/foo/bar/bazz").Partition()) diff --git a/protocol/notification_message_test.go b/protocol/notification_message_test.go index 9eb53505..553d513a 100644 --- a/protocol/notification_message_test.go +++ b/protocol/notification_message_test.go @@ -46,3 +46,23 @@ func TestParsingErrorNotificationMessage(t *testing.T) { assert.Equal("", msg.Json) assert.Equal(true, msg.IsError) } + +func TestSerializeAnErrorMessage(t *testing.T) { + msg := &NotificationMessage{ + Name: ERROR_BAD_REQUEST, + Arg: "you are so bad.", + IsError: true, + } + + assert.Equal(t, "!"+ERROR_BAD_REQUEST+" "+"you are so bad.", string(msg.Bytes())) +} + +func TestSerializeANotificationMessageWithEmptyArg(t *testing.T) { + msg := &NotificationMessage{ + Name: SUCCESS_SEND, + Arg: "", + IsError: false, + } + + assert.Equal(t, "#"+SUCCESS_SEND, string(msg.Bytes())) +} From b88b67658f906124d58a92c64084ba6838325a13 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 11:46:48 +0200 Subject: [PATCH 04/11] Skip postgres test if short flag is set --- server/kvstore/postgres_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/kvstore/postgres_test.go b/server/kvstore/postgres_test.go index 073c279d..a2ba9f4e 100644 --- a/server/kvstore/postgres_test.go +++ b/server/kvstore/postgres_test.go @@ -1,8 +1,11 @@ package kvstore import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/cosminrentea/gobbler/testutil" + + "github.com/stretchr/testify/assert" ) func BenchmarkPostgresKVStore_PutGet(b *testing.B) { @@ -12,24 +15,28 @@ func BenchmarkPostgresKVStore_PutGet(b *testing.B) { } func TestPostgresKVStore_PutGetDelete(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(aPostgresConfig()) kvs.Open() CommonTestPutGetDelete(t, kvs, kvs) } func TestPostgresKVStore_Iterate(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(aPostgresConfig()) kvs.Open() CommonTestIterate(t, kvs, kvs) } func TestPostgresKVStore_IterateKeys(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(aPostgresConfig()) kvs.Open() CommonTestIterateKeys(t, kvs, kvs) } func TestPostgresKVStore_Check(t *testing.T) { + testutil.SkipIfShort(t) a := assert.New(t) kvs := NewPostgresKVStore(aPostgresConfig()) @@ -45,6 +52,7 @@ func TestPostgresKVStore_Check(t *testing.T) { } func TestPostgresKVStore_Open(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(invalidPostgresConfig()) err := kvs.Open() assert.NotNil(t, err) From 02f9a05ef16fdde0ec1216f374bf3bbe08848f50 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 11:47:18 +0200 Subject: [PATCH 05/11] Fix failing tests cause of new Expires field in message metadata --- client/client_test.go | 2 +- server/router/route_test.go | 2 +- server/websocket/receiver_test.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 2bee449d..eb933e08 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" ) -var aNormalMessage = `/foo/bar,42,user01,phone01,{},1420110000,0 +var aNormalMessage = `/foo/bar,42,user01,phone01,{},,1420110000,0 Hello World` diff --git a/server/router/route_test.go b/server/router/route_test.go index 7b1021cd..e08b4c5f 100644 --- a/server/router/route_test.go +++ b/server/router/route_test.go @@ -19,7 +19,7 @@ import ( var ( dummyPath = protocol.Path("/dummy") dummyMessageWithID = &protocol.Message{ID: 1, Path: dummyPath, Body: []byte("dummy body")} - dummyMessageBytes = `/dummy,MESSAGE_ID,user01,phone01,{},1420110000,1 + dummyMessageBytes = `/dummy,MESSAGE_ID,user01,phone01,{},,1420110000,1 {"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"} Hello World` chanSize = 10 diff --git a/server/websocket/receiver_test.go b/server/websocket/receiver_test.go index 9ab032f7..559e3172 100644 --- a/server/websocket/receiver_test.go +++ b/server/websocket/receiver_test.go @@ -133,8 +133,8 @@ func Test_Receiver_Fetch_Subscribe_Fetch_Subscribe(t *testing.T) { "fetch_first2-a", "#"+protocol.SUCCESS_FETCH_END+" /foo", "#"+protocol.SUCCESS_SUBSCRIBED_TO+" /foo", - ",4,,,,1405544146,0\n\nrouter-a", - ",5,,,,1405544146,0\n\nrouter-b", + ",4,,,,,1405544146,0\n\nrouter-a", + ",5,,,,,1405544146,0\n\nrouter-b", "#"+protocol.SUCCESS_FETCH_START+" /foo 1", "fetch_after-a", "#"+protocol.SUCCESS_FETCH_END+" /foo", From cdb803eb3fdb74ce0e2fb07f1f51ba09efafdaa8 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 11:47:41 +0200 Subject: [PATCH 06/11] Cleanup message struct methods --- guble-cli/main.go | 2 +- protocol/message.go | 80 ++++++++++++++---------------- protocol/message_test.go | 4 +- server/benchmarking_fetch_test.go | 2 +- server/benchmarking_test.go | 2 +- server/cluster_integration_test.go | 9 ++-- 6 files changed, 48 insertions(+), 51 deletions(-) diff --git a/guble-cli/main.go b/guble-cli/main.go index c1a2e260..9a7c0489 100644 --- a/guble-cli/main.go +++ b/guble-cli/main.go @@ -73,7 +73,7 @@ func readLoop(client client.Client) { if *verbose { fmt.Println(string(incomingMessage.Bytes())) } else { - fmt.Printf("%v: %v\n", incomingMessage.UserID, incomingMessage.BodyAsString()) + fmt.Printf("%v: %v\n", incomingMessage.UserID, string(incomingMessage.Body)) } case e := <-client.Errors(): fmt.Println("ERROR: " + string(e.Bytes())) diff --git a/protocol/message.go b/protocol/message.go index a23b3c53..44eb6eac 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -54,97 +54,93 @@ type Message struct { type MessageDeliveryCallback func(*Message) // Metadata returns the first line of a serialized message, without the newline -func (msg *Message) Metadata() string { +func (m *Message) Metadata() string { buff := &bytes.Buffer{} - msg.writeMetadata(buff) + m.writeMetadata(buff) return string(buff.Bytes()) } -func (msg *Message) String() string { - return fmt.Sprintf("%d", msg.ID) -} - -func (msg *Message) BodyAsString() string { - return string(msg.Body) +func (m *Message) String() string { + return fmt.Sprintf("%d: %s", m.ID, string(m.Body)) } // Bytes serializes the message into a byte slice -func (msg *Message) Bytes() []byte { +func (m *Message) Bytes() []byte { buff := &bytes.Buffer{} - msg.writeMetadata(buff) + m.writeMetadata(buff) - if len(msg.HeaderJSON) > 0 || len(msg.Body) > 0 { + if len(m.HeaderJSON) > 0 || len(m.Body) > 0 { buff.WriteString("\n") } - if len(msg.HeaderJSON) > 0 { - buff.WriteString(msg.HeaderJSON) + if len(m.HeaderJSON) > 0 { + buff.WriteString(m.HeaderJSON) } - if len(msg.Body) > 0 { + if len(m.Body) > 0 { buff.WriteString("\n") - buff.Write(msg.Body) + buff.Write(m.Body) } return buff.Bytes() } -func (msg *Message) writeMetadata(buff *bytes.Buffer) { - buff.WriteString(string(msg.Path)) +func (m *Message) writeMetadata(buff *bytes.Buffer) { + buff.WriteString(string(m.Path)) buff.WriteByte(',') - buff.WriteString(strconv.FormatUint(msg.ID, 10)) + buff.WriteString(strconv.FormatUint(m.ID, 10)) buff.WriteByte(',') - buff.WriteString(msg.UserID) + buff.WriteString(m.UserID) buff.WriteByte(',') - buff.WriteString(msg.ApplicationID) + buff.WriteString(m.ApplicationID) buff.WriteByte(',') - buff.Write(msg.encodeFilters()) + buff.Write(m.encodeFilters()) buff.WriteByte(',') - if msg.Expires != nil { - buff.WriteString(msg.Expires.Format(time.RFC3339)) + if m.Expires != nil { + buff.WriteString(m.Expires.Format(time.RFC3339)) } buff.WriteByte(',') - buff.WriteString(strconv.FormatInt(msg.Time, 10)) + buff.WriteString(strconv.FormatInt(m.Time, 10)) buff.WriteByte(',') - buff.WriteString(strconv.FormatUint(uint64(msg.NodeID), 10)) + buff.WriteString(strconv.FormatUint(uint64(m.NodeID), 10)) } -func (msg *Message) encodeFilters() []byte { - if msg.Filters == nil { +func (m *Message) encodeFilters() []byte { + if m.Filters == nil { return []byte{} } - data, err := json.Marshal(msg.Filters) + data, err := json.Marshal(m.Filters) if err != nil { - log.WithError(err).WithField("filters", msg.Filters).Error("Error encoding filters") + log.WithError(err).WithField("filters", m.Filters).Error("Error encoding filters") return []byte{} } return data } -func (msg *Message) decodeFilters(data []byte) { +func (m *Message) decodeFilters(data []byte) { if len(data) == 0 { return } - msg.Filters = make(map[string]string) - err := json.Unmarshal(data, &msg.Filters) + m.Filters = make(map[string]string) + err := json.Unmarshal(data, &m.Filters) if err != nil { log.WithError(err).WithField("data", string(data)).Error("Error decoding filters") } -} -func (msg *Message) SetFilter(key, value string) { - if msg.Filters == nil { - msg.Filters = make(map[string]string, 1) +} +func (m *Message) SetFilter(key, value string) { + if m.Filters == nil { + m.Filters = make(map[string]string, 1) } - msg.Filters[key] = value + m.Filters[key] = value } // Decode decodes a message, sent from the server to the client. @@ -196,7 +192,7 @@ func ParseMessage(message []byte) (*Message, error) { return nil, fmt.Errorf("message metadata to have an integer (nodeID) as eighth field, but was %v", meta[7]) } - msg := &Message{ + m := &Message{ ID: id, Path: Path(meta[0]), UserID: meta[2], @@ -205,15 +201,15 @@ func ParseMessage(message []byte) (*Message, error) { Time: publishingTime, NodeID: uint8(nodeID), } - msg.decodeFilters([]byte(meta[4])) + m.decodeFilters([]byte(meta[4])) if len(parts) >= 2 { - msg.HeaderJSON = parts[1] + m.HeaderJSON = parts[1] } if len(parts) == 3 { - msg.Body = []byte(parts[2]) + m.Body = []byte(parts[2]) } - return msg, nil + return m, nil } diff --git a/protocol/message_test.go b/protocol/message_test.go index dc999644..930b296f 100644 --- a/protocol/message_test.go +++ b/protocol/message_test.go @@ -62,7 +62,7 @@ func TestSerializeANormalMessage(t *testing.T) { // then: the serialisation is as expected assert.Equal(t, aNormalMessageNoExpires, string(msg.Bytes())) - assert.Equal(t, "Hello World", msg.BodyAsString()) + assert.Equal(t, "42: Hello World", msg.String()) // and: the first line is as expected assert.Equal(t, strings.SplitN(aNormalMessageNoExpires, "\n", 2)[0], msg.Metadata()) @@ -88,7 +88,7 @@ func TestSerializeANormalMessageWithExpires(t *testing.T) { // then: the serialisation is as expected assert.Equal(t, aNormalMessage, string(msg.Bytes())) - assert.Equal(t, "Hello World", msg.BodyAsString()) + assert.Equal(t, "42: Hello World", msg.String()) // and: the first line is as expected assert.Equal(t, strings.SplitN(aNormalMessage, "\n", 2)[0], msg.Metadata()) diff --git a/server/benchmarking_fetch_test.go b/server/benchmarking_fetch_test.go index 41cd55f2..8dd85d0d 100644 --- a/server/benchmarking_fetch_test.go +++ b/server/benchmarking_fetch_test.go @@ -52,7 +52,7 @@ func Benchmark_E2E_Fetch_HelloWorld_Messages(b *testing.B) { for i := 1; i <= b.N; i++ { select { case msg := <-c.Messages(): - a.Equal(fmt.Sprintf("Hello %v", i), msg.BodyAsString()) + a.Equal(fmt.Sprintf("Hello %v", i), string(msg.Body)) case e := <-c.Errors(): a.Fail(string(e.Bytes())) return diff --git a/server/benchmarking_test.go b/server/benchmarking_test.go index 1557c334..41eff15d 100644 --- a/server/benchmarking_test.go +++ b/server/benchmarking_test.go @@ -157,7 +157,7 @@ func (tg *testgroup) Start() { select { case msg := <-tg.consumer.Messages(): assert.Equal(tg.t, tg.topic, string(msg.Path)) - if !assert.Equal(tg.t, body, msg.BodyAsString()) { + if !assert.Equal(tg.t, body, string(msg.Body)) { tg.t.FailNow() tg.done <- false } diff --git a/server/cluster_integration_test.go b/server/cluster_integration_test.go index 8938348a..6be0b97c 100644 --- a/server/cluster_integration_test.go +++ b/server/cluster_integration_test.go @@ -1,12 +1,13 @@ package server import ( + "testing" + "time" + log "github.com/Sirupsen/logrus" "github.com/cosminrentea/gobbler/protocol" "github.com/cosminrentea/gobbler/testutil" "github.com/stretchr/testify/assert" - "testing" - "time" ) func Test_Cluster_Subscribe_To_Random_Node(t *testing.T) { @@ -109,12 +110,12 @@ WAIT: "path": incomingMessage.Path, "incomingMsgUserId": incomingMessage.UserID, "headerJson": incomingMessage.HeaderJSON, - "body": incomingMessage.BodyAsString(), + "body": string(incomingMessage.Body), "numReceived": numReceived, }).Info("Client2 received a message") a.Equal(protocol.Path("/testTopic/m"), incomingMessage.Path) - a.Equal("body", incomingMessage.BodyAsString()) + a.Equal("body", string(incomingMessage.Body)) a.True(incomingMessage.ID > 0) idReceived[incomingMessage.ID] = true From 16bf0650269f10c1c73bdf70cb413c108880d70b Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 12:30:13 +0200 Subject: [PATCH 07/11] Remove defer for skipping test --- server/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/integration_test.go b/server/integration_test.go index 9d18b5e8..e037668c 100644 --- a/server/integration_test.go +++ b/server/integration_test.go @@ -82,8 +82,8 @@ type Subscriber struct { } func TestSubscribersIntegration(t *testing.T) { - defer testutil.SkipIfShort(t) - defer testutil.SkipIfDisabled(t) + testutil.SkipIfShort(t) + testutil.SkipIfDisabled(t) defer testutil.ResetDefaultRegistryHealthCheck() defer testutil.EnableDebugForMethod()() From 1ad2624fea893c9ace7f8f0ebab17ab9ac124f90 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 12:30:39 +0200 Subject: [PATCH 08/11] Skip only if disabled --- server/integration_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/integration_test.go b/server/integration_test.go index e037668c..29a99e24 100644 --- a/server/integration_test.go +++ b/server/integration_test.go @@ -82,7 +82,6 @@ type Subscriber struct { } func TestSubscribersIntegration(t *testing.T) { - testutil.SkipIfShort(t) testutil.SkipIfDisabled(t) defer testutil.ResetDefaultRegistryHealthCheck() From 06534898f5ddcdbebd413ec51478f98abbd99451 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 14:31:15 +0200 Subject: [PATCH 09/11] Add IsExpired method on message --- protocol/message.go | 12 ++++++++++++ protocol/message_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/protocol/message.go b/protocol/message.go index 44eb6eac..de3a08c3 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -136,6 +136,7 @@ func (m *Message) decodeFilters(data []byte) { } } + func (m *Message) SetFilter(key, value string) { if m.Filters == nil { m.Filters = make(map[string]string, 1) @@ -143,6 +144,17 @@ func (m *Message) SetFilter(key, value string) { m.Filters[key] = value } +// IsExpired returns true if the message `Expires` field is set and the current time +// has pasted the `Expires` time +// +// Checks are made using `Expires` field timezone +func (m *Message) IsExpired() bool { + if m.Expires == nil { + return true + } + return m.Expires != nil && m.Expires.Before(time.Now().In(m.Expires.Location())) +} + // Decode decodes a message, sent from the server to the client. // The decoded messages can have one of the types: *Message or *NotificationMessage func Decode(message []byte) (interface{}, error) { diff --git a/protocol/message_test.go b/protocol/message_test.go index 930b296f..304cf577 100644 --- a/protocol/message_test.go +++ b/protocol/message_test.go @@ -195,3 +195,35 @@ func TestMessage_decodeFilters(t *testing.T) { a.Equal(msg.Filters["user"], "user01") a.Equal(msg.Filters["device_id"], "ID_DEVICE") } + +func TestMessage_IsExpired(t *testing.T) { + a := assert.New(t) + + n := time.Now().UTC() + loc1, err := time.LoadLocation("Europe/Berlin") + a.NoError(err) + + loc2, err := time.LoadLocation("Europe/Bucharest") + a.NoError(err) + + cases := []struct { + expires time.Time + result bool + }{ + {n.AddDate(0, 0, 1), false}, + {n.AddDate(0, 0, -1), true}, + {n.AddDate(0, 0, 1).In(loc1), false}, + {n.AddDate(0, 0, 1).In(loc2), false}, + {n.AddDate(0, 0, -1).In(loc1), true}, + {n.AddDate(0, 0, -1).In(loc2), true}, + {n.Add(1 * time.Minute).In(loc1), false}, + {n.Add(1 * time.Minute).In(loc2), false}, + {n.Add(-1 * time.Minute).In(loc1), true}, + {n.Add(-1 * time.Minute).In(loc2), true}, + } + + for i, c := range cases { + a.Equal(c.result, (&Message{Expires: &c.expires}).IsExpired(), "Failed IsExpired case: %d", i) + } + +} From 4a3cd885b387118f1eda63caf78d265dcf2e6b00 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 14:34:16 +0200 Subject: [PATCH 10/11] Change Message method name: Bytes -> Encode --- client/client_test.go | 2 +- guble-cli/main.go | 2 +- protocol/message.go | 4 ++-- protocol/message_test.go | 8 ++++---- server/cluster/cluster.go | 2 +- server/router/router.go | 2 +- server/router/router_test.go | 4 ++-- server/store/dummystore/dummy_message_store.go | 2 +- server/store/filestore/message_store.go | 4 ++-- server/websocket/receiver.go | 2 +- server/websocket/websocket_connector_test.go | 10 +++++----- 11 files changed, 21 insertions(+), 21 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index eb933e08..0b4bb25a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -184,7 +184,7 @@ func TestReceiveAMessage(t *testing.T) { // than we receive the expected message select { case m := <-c.Messages(): - a.Equal(aNormalMessage, string(m.Bytes())) + a.Equal(aNormalMessage, string(m.Encode())) case <-time.After(time.Millisecond * 10): a.Fail("timeout while waiting for message") } diff --git a/guble-cli/main.go b/guble-cli/main.go index 9a7c0489..bf9d3f64 100644 --- a/guble-cli/main.go +++ b/guble-cli/main.go @@ -71,7 +71,7 @@ func readLoop(client client.Client) { select { case incomingMessage := <-client.Messages(): if *verbose { - fmt.Println(string(incomingMessage.Bytes())) + fmt.Println(string(incomingMessage.Encode())) } else { fmt.Printf("%v: %v\n", incomingMessage.UserID, string(incomingMessage.Body)) } diff --git a/protocol/message.go b/protocol/message.go index de3a08c3..1598ea58 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -64,8 +64,8 @@ func (m *Message) String() string { return fmt.Sprintf("%d: %s", m.ID, string(m.Body)) } -// Bytes serializes the message into a byte slice -func (m *Message) Bytes() []byte { +// Encode serializes the message into a byte slice +func (m *Message) Encode() []byte { buff := &bytes.Buffer{} m.writeMetadata(buff) diff --git a/protocol/message_test.go b/protocol/message_test.go index 304cf577..5f3bb6b0 100644 --- a/protocol/message_test.go +++ b/protocol/message_test.go @@ -61,7 +61,7 @@ func TestSerializeANormalMessage(t *testing.T) { } // then: the serialisation is as expected - assert.Equal(t, aNormalMessageNoExpires, string(msg.Bytes())) + assert.Equal(t, aNormalMessageNoExpires, string(msg.Encode())) assert.Equal(t, "42: Hello World", msg.String()) // and: the first line is as expected @@ -87,7 +87,7 @@ func TestSerializeANormalMessageWithExpires(t *testing.T) { msg.Expires = &expire // then: the serialisation is as expected - assert.Equal(t, aNormalMessage, string(msg.Bytes())) + assert.Equal(t, aNormalMessage, string(msg.Encode())) assert.Equal(t, "42: Hello World", msg.String()) // and: the first line is as expected @@ -101,7 +101,7 @@ func TestSerializeAMinimalMessage(t *testing.T) { Time: unixTime.Unix(), } - assert.Equal(t, aMinimalMessage, string(msg.Bytes())) + assert.Equal(t, aMinimalMessage, string(msg.Encode())) } func TestSerializeAMinimalMessageWithBody(t *testing.T) { @@ -112,7 +112,7 @@ func TestSerializeAMinimalMessageWithBody(t *testing.T) { Body: []byte("Hello World"), } - assert.Equal(t, aMinimalMessage+"\n\nHello World", string(msg.Bytes())) + assert.Equal(t, aMinimalMessage+"\n\nHello World", string(msg.Encode())) } func TestParsingAMinimalMessage(t *testing.T) { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3df76d85..fcca9a83 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -169,7 +169,7 @@ func (cluster *Cluster) BroadcastMessage(pMessage *protocol.Message) error { cMessage := &message{ NodeID: cluster.Config.ID, Type: mtGubleMessage, - Body: pMessage.Bytes(), + Body: pMessage.Encode(), } return cluster.broadcastClusterMessage(cMessage) } diff --git a/server/router/router.go b/server/router/router.go index 1dd2b0fb..e8ba9cac 100644 --- a/server/router/router.go +++ b/server/router/router.go @@ -176,7 +176,7 @@ func (router *router) HandleMessage(message *protocol.Message) error { nodeID = router.cluster.Config.ID } - mTotalMessagesIncomingBytes.Add(int64(len(message.Bytes()))) + mTotalMessagesIncomingBytes.Add(int64(len(message.Encode()))) size, err := router.messageStore.StoreMessage(message, nodeID) if err != nil { logger.WithField("error", err.Error()).Error("Error storing message") diff --git a/server/router/router_test.go b/server/router/router_test.go index 606c26c6..49d23c5b 100644 --- a/server/router/router_test.go +++ b/server/router/router_test.go @@ -171,7 +171,7 @@ func TestRouter_HandleMessageNotAllowed(t *testing.T) { m.ID = id m.Time = ts m.NodeID = nodeID - return len(m.Bytes()), nil + return len(m.Encode()), nil }) // sending message @@ -234,7 +234,7 @@ func TestRouter_SimpleMessageSending(t *testing.T) { m.ID = id m.Time = ts m.NodeID = nodeID - return len(m.Bytes()), nil + return len(m.Encode()), nil }) // when i send a message to the route diff --git a/server/store/dummystore/dummy_message_store.go b/server/store/dummystore/dummy_message_store.go index 98d70aea..5bd9088d 100644 --- a/server/store/dummystore/dummy_message_store.go +++ b/server/store/dummystore/dummy_message_store.go @@ -68,7 +68,7 @@ func (dms *DummyMessageStore) StoreMessage(message *protocol.Message, nodeID uin message.ID = nextID message.Time = ts message.NodeID = nodeID - data := message.Bytes() + data := message.Encode() if err := dms.Store(partitionName, nextID, data); err != nil { return 0, err } diff --git a/server/store/filestore/message_store.go b/server/store/filestore/message_store.go index 52058874..2e67ab33 100644 --- a/server/store/filestore/message_store.go +++ b/server/store/filestore/message_store.go @@ -95,9 +95,9 @@ func (fms *FileMessageStore) StoreMessage(message *protocol.Message, nodeID uint }).Debug("Locally generated ID for message") } - data := message.Bytes() + data := message.Encode() - if err := fms.Store(partitionName, message.ID, message.Bytes()); err != nil { + if err := fms.Store(partitionName, message.ID, message.Encode()); err != nil { logger. WithError(err).WithField("partition", partitionName). Error("Error storing locally generated messagein partition") diff --git a/server/websocket/receiver.go b/server/websocket/receiver.go index 573757c7..8a10016a 100644 --- a/server/websocket/receiver.go +++ b/server/websocket/receiver.go @@ -180,7 +180,7 @@ func (rec *Receiver) receiveFromSubscription() { if m.ID > rec.lastSentID { rec.lastSentID = m.ID - rec.sendC <- m.Bytes() + rec.sendC <- m.Encode() } else { logger.WithFields(log.Fields{ "msgId": m.ID, diff --git a/server/websocket/websocket_connector_test.go b/server/websocket/websocket_connector_test.go index 59c97c7f..2cf207cb 100644 --- a/server/websocket/websocket_connector_test.go +++ b/server/websocket/websocket_connector_test.go @@ -80,11 +80,11 @@ func Test_AnIncomingMessageIsDelivered(t *testing.T) { wsconn, routerMock, messageStore := createDefaultMocks([]string{}) - wsconn.EXPECT().Send(aTestMessage.Bytes()) + wsconn.EXPECT().Send(aTestMessage.Encode()) handler := runNewWebSocket(wsconn, routerMock, messageStore, nil) - handler.sendChannel <- aTestMessage.Bytes() + handler.sendChannel <- aTestMessage.Encode() time.Sleep(time.Millisecond * 2) } @@ -106,18 +106,18 @@ func Test_AnIncomingMessageIsNotAllowed(t *testing.T) { }() time.Sleep(time.Millisecond * 2) - handler.sendChannel <- aTestMessage.Bytes() + handler.sendChannel <- aTestMessage.Encode() time.Sleep(time.Millisecond * 2) //nothing shall have been sent //now allow tam.EXPECT().IsAllowed(auth.READ, "testuser", protocol.Path("/foo")).Return(true) - wsconn.EXPECT().Send(aTestMessage.Bytes()) + wsconn.EXPECT().Send(aTestMessage.Encode()) time.Sleep(time.Millisecond * 2) - handler.sendChannel <- aTestMessage.Bytes() + handler.sendChannel <- aTestMessage.Encode() time.Sleep(time.Millisecond * 2) } From 0b2256fd8651a08f824b7d94e9efca477ec129a2 Mon Sep 17 00:00:00 2001 From: "Bogdan I. Bursuc" Date: Fri, 17 Mar 2017 15:12:20 +0200 Subject: [PATCH 11/11] Skip cluster test if disabled --- server/redundancy_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/redundancy_test.go b/server/redundancy_test.go index 31ffcedb..6b89f532 100644 --- a/server/redundancy_test.go +++ b/server/redundancy_test.go @@ -10,6 +10,7 @@ import ( ) func Test_Subscribe_on_random_node(t *testing.T) { + testutil.SkipIfDisabled(t) testutil.SkipIfShort(t) a := assert.New(t)