diff --git a/client/client_test.go b/client/client_test.go index 2bee449d..0b4bb25a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" ) -var aNormalMessage = `/foo/bar,42,user01,phone01,{},1420110000,0 +var aNormalMessage = `/foo/bar,42,user01,phone01,{},,1420110000,0 Hello World` @@ -184,7 +184,7 @@ func TestReceiveAMessage(t *testing.T) { // than we receive the expected message select { case m := <-c.Messages(): - a.Equal(aNormalMessage, string(m.Bytes())) + a.Equal(aNormalMessage, string(m.Encode())) case <-time.After(time.Millisecond * 10): a.Fail("timeout while waiting for message") } diff --git a/guble-cli/main.go b/guble-cli/main.go index c1a2e260..bf9d3f64 100644 --- a/guble-cli/main.go +++ b/guble-cli/main.go @@ -71,9 +71,9 @@ func readLoop(client client.Client) { select { case incomingMessage := <-client.Messages(): if *verbose { - fmt.Println(string(incomingMessage.Bytes())) + fmt.Println(string(incomingMessage.Encode())) } else { - fmt.Printf("%v: %v\n", incomingMessage.UserID, incomingMessage.BodyAsString()) + fmt.Printf("%v: %v\n", incomingMessage.UserID, string(incomingMessage.Body)) } case e := <-client.Errors(): fmt.Println("ERROR: " + string(e.Bytes())) diff --git a/protocol/message.go b/protocol/message.go index 38dbb6df..1598ea58 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "time" log "github.com/Sirupsen/logrus" ) @@ -30,6 +31,13 @@ type Message struct { // routes that match the filters Filters map[string]string + // Expires field specifies until when the message is valid to be processed + // If this field is set and the message is expired the connectors should + // consider the message as processed and log the action + // + // RFC3339 format + Expires *time.Time + // The time of publishing, as Unix Timestamp date Time int64 @@ -46,138 +54,105 @@ type Message struct { type MessageDeliveryCallback func(*Message) // Metadata returns the first line of a serialized message, without the newline -func (msg *Message) Metadata() string { +func (m *Message) Metadata() string { buff := &bytes.Buffer{} - msg.writeMetadata(buff) + m.writeMetadata(buff) return string(buff.Bytes()) } -func (msg *Message) String() string { - return fmt.Sprintf("%d", msg.ID) -} - -func (msg *Message) BodyAsString() string { - return string(msg.Body) +func (m *Message) String() string { + return fmt.Sprintf("%d: %s", m.ID, string(m.Body)) } -// Bytes serializes the message into a byte slice -func (msg *Message) Bytes() []byte { +// Encode serializes the message into a byte slice +func (m *Message) Encode() []byte { buff := &bytes.Buffer{} - msg.writeMetadata(buff) + m.writeMetadata(buff) - if len(msg.HeaderJSON) > 0 || len(msg.Body) > 0 { + if len(m.HeaderJSON) > 0 || len(m.Body) > 0 { buff.WriteString("\n") } - if len(msg.HeaderJSON) > 0 { - buff.WriteString(msg.HeaderJSON) + if len(m.HeaderJSON) > 0 { + buff.WriteString(m.HeaderJSON) } - if len(msg.Body) > 0 { + if len(m.Body) > 0 { buff.WriteString("\n") - buff.Write(msg.Body) + buff.Write(m.Body) } return buff.Bytes() } -func (msg *Message) writeMetadata(buff *bytes.Buffer) { - buff.WriteString(string(msg.Path)) - buff.WriteString(",") - buff.WriteString(strconv.FormatUint(msg.ID, 10)) - buff.WriteString(",") - buff.WriteString(msg.UserID) - buff.WriteString(",") - buff.WriteString(msg.ApplicationID) - buff.WriteString(",") - buff.Write(msg.encodeFilters()) - buff.WriteString(",") - buff.WriteString(strconv.FormatInt(msg.Time, 10)) - buff.WriteString(",") - buff.WriteString(strconv.FormatUint(uint64(msg.NodeID), 10)) +func (m *Message) writeMetadata(buff *bytes.Buffer) { + buff.WriteString(string(m.Path)) + buff.WriteByte(',') + + buff.WriteString(strconv.FormatUint(m.ID, 10)) + buff.WriteByte(',') + + buff.WriteString(m.UserID) + buff.WriteByte(',') + + buff.WriteString(m.ApplicationID) + buff.WriteByte(',') + + buff.Write(m.encodeFilters()) + buff.WriteByte(',') + + if m.Expires != nil { + buff.WriteString(m.Expires.Format(time.RFC3339)) + } + buff.WriteByte(',') + + buff.WriteString(strconv.FormatInt(m.Time, 10)) + buff.WriteByte(',') + + buff.WriteString(strconv.FormatUint(uint64(m.NodeID), 10)) } -func (msg *Message) encodeFilters() []byte { - if msg.Filters == nil { +func (m *Message) encodeFilters() []byte { + if m.Filters == nil { return []byte{} } - data, err := json.Marshal(msg.Filters) + data, err := json.Marshal(m.Filters) if err != nil { - log.WithError(err).WithField("filters", msg.Filters).Error("Error encoding filters") + log.WithError(err).WithField("filters", m.Filters).Error("Error encoding filters") return []byte{} } return data } -func (msg *Message) decodeFilters(data []byte) { +func (m *Message) decodeFilters(data []byte) { if len(data) == 0 { return } - msg.Filters = make(map[string]string) - err := json.Unmarshal(data, &msg.Filters) + m.Filters = make(map[string]string) + err := json.Unmarshal(data, &m.Filters) if err != nil { log.WithError(err).WithField("data", string(data)).Error("Error decoding filters") } -} - -func (msg *Message) SetFilter(key, value string) { - if msg.Filters == nil { - msg.Filters = make(map[string]string, 1) - } - msg.Filters[key] = value -} - -// Valid constants for the NotificationMessage.Name -const ( - SUCCESS_CONNECTED = "connected" - SUCCESS_SEND = "send" - SUCCESS_FETCH_START = "fetch-start" - SUCCESS_FETCH_END = "fetch-end" - SUCCESS_SUBSCRIBED_TO = "subscribed-to" - SUCCESS_CANCELED = "canceled" - ERROR_SUBSCRIBED_TO = "error-subscribed-to" - ERROR_BAD_REQUEST = "error-bad-request" - ERROR_INTERNAL_SERVER = "error-server-internal" -) - -// NotificationMessage is a representation of a status messages or error message, sent from the server -type NotificationMessage struct { - - // The name of the message - Name string - - // The argument line, following the messageName - Arg string - - // The optional json data supplied with the message - Json string - // Flag which indicates, if the notification is an error - IsError bool } -// Bytes serializes the notification message into a byte slice -func (msg *NotificationMessage) Bytes() []byte { - buff := &bytes.Buffer{} - - if msg.IsError { - buff.WriteString("!") - } else { - buff.WriteString("#") - } - buff.WriteString(msg.Name) - if len(msg.Arg) > 0 { - buff.WriteString(" ") - buff.WriteString(msg.Arg) +func (m *Message) SetFilter(key, value string) { + if m.Filters == nil { + m.Filters = make(map[string]string, 1) } + m.Filters[key] = value +} - if len(msg.Json) > 0 { - buff.WriteString("\n") - buff.WriteString(msg.Json) +// IsExpired returns true if the message `Expires` field is set and the current time +// has pasted the `Expires` time +// +// Checks are made using `Expires` field timezone +func (m *Message) IsExpired() bool { + if m.Expires == nil { + return true } - - return buff.Bytes() + return m.Expires != nil && m.Expires.Before(time.Now().In(m.Expires.Location())) } // Decode decodes a message, sent from the server to the client. @@ -197,7 +172,7 @@ func ParseMessage(message []byte) (*Message, error) { meta := strings.Split(parts[0], ",") - if len(meta) != 7 { + if len(meta) != 8 { return nil, fmt.Errorf("message metadata has to have 7 fields, but was %v", parts[0]) } @@ -210,57 +185,43 @@ func ParseMessage(message []byte) (*Message, error) { return nil, fmt.Errorf("message metadata to have an integer (message-id) as second field, but was %v", meta[1]) } - publishingTime, err := strconv.ParseInt(meta[5], 10, 64) + var expiresTime *time.Time + if meta[5] != "" { + if t, err := time.Parse(time.RFC3339, meta[5]); err != nil { + return nil, fmt.Errorf("message metadata expected to have a time string (expiration time) as sixth field, but was %v: %s", meta[5], err.Error()) + } else { + expiresTime = &t + } + } + + publishingTime, err := strconv.ParseInt(meta[6], 10, 64) if err != nil { - return nil, fmt.Errorf("message metadata to have an integer (publishing time) as sixth field, but was %v", meta[5]) + return nil, fmt.Errorf("message metadata to have an integer (publishing time) as seventh field, but was %v:", meta[6]) } - nodeID, err := strconv.ParseUint(meta[6], 10, 8) + nodeID, err := strconv.ParseUint(meta[7], 10, 8) if err != nil { - return nil, fmt.Errorf("message metadata to have an integer (nodeID) as seventh field, but was %v", meta[6]) + return nil, fmt.Errorf("message metadata to have an integer (nodeID) as eighth field, but was %v", meta[7]) } - msg := &Message{ + m := &Message{ ID: id, Path: Path(meta[0]), UserID: meta[2], ApplicationID: meta[3], + Expires: expiresTime, Time: publishingTime, NodeID: uint8(nodeID), } - msg.decodeFilters([]byte(meta[4])) + m.decodeFilters([]byte(meta[4])) if len(parts) >= 2 { - msg.HeaderJSON = parts[1] + m.HeaderJSON = parts[1] } if len(parts) == 3 { - msg.Body = []byte(parts[2]) - } - - return msg, nil -} - -func parseNotificationMessage(message []byte) (*NotificationMessage, error) { - msg := &NotificationMessage{} - - if len(message) < 2 || (message[0] != '#' && message[0] != '!') { - return nil, fmt.Errorf("message has to start with '#' or '!' and a name, but got '%v'", message) - } - msg.IsError = message[0] == '!' - - parts := strings.SplitN(string(message)[1:], "\n", 2) - firstLine := strings.SplitN(parts[0], " ", 2) - - msg.Name = firstLine[0] - - if len(firstLine) > 1 { - msg.Arg = firstLine[1] - } - - if len(parts) > 1 { - msg.Json = parts[1] + m.Body = []byte(parts[2]) } - return msg, nil + return m, nil } diff --git a/protocol/message_test.go b/protocol/message_test.go index c5900aed..5f3bb6b0 100644 --- a/protocol/message_test.go +++ b/protocol/message_test.go @@ -9,17 +9,23 @@ import ( "github.com/stretchr/testify/assert" ) -var aNormalMessage = `/foo/bar,42,user01,phone01,{"user":"user01"},1420110000,1 +var ( + aNormalMessage = `/foo/bar,42,user01,phone01,{"user":"user01"},2017-03-17T20:04:26+02:00,1420110000,1 {"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"} Hello World` -var aMinimalMessage = "/,42,,,,1420110000,0" + aNormalMessageNoExpires = `/foo/bar,42,user01,phone01,{"user":"user01"},,1420110000,1 +{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"} +Hello World` -var aConnectedNotification = `#connected You are connected to the server. + aMinimalMessage = "/,42,,,,,1420110000,0" + + aConnectedNotification = `#connected You are connected to the server. {"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}` -// 2015-01-01T12:00:00+01:00 is equal to 1420110000 -var unixTime, _ = time.Parse(time.RFC3339, "2015-01-01T12:00:00+01:00") + // 2015-01-01T12:00:00+01:00 is equal to 1420110000 + unixTime, _ = time.Parse(time.RFC3339, "2015-01-01T12:00:00+01:00") +) func TestParsingANormalMessage(t *testing.T) { assert := assert.New(t) @@ -55,8 +61,34 @@ func TestSerializeANormalMessage(t *testing.T) { } // then: the serialisation is as expected - assert.Equal(t, aNormalMessage, string(msg.Bytes())) - assert.Equal(t, "Hello World", msg.BodyAsString()) + assert.Equal(t, aNormalMessageNoExpires, string(msg.Encode())) + assert.Equal(t, "42: Hello World", msg.String()) + + // and: the first line is as expected + assert.Equal(t, strings.SplitN(aNormalMessageNoExpires, "\n", 2)[0], msg.Metadata()) +} + +func TestSerializeANormalMessageWithExpires(t *testing.T) { + // given: a message + msg := &Message{ + ID: uint64(42), + Path: Path("/foo/bar"), + UserID: "user01", + ApplicationID: "phone01", + Filters: map[string]string{"user": "user01"}, + Time: unixTime.Unix(), + NodeID: 1, + HeaderJSON: `{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}`, + Body: []byte("Hello World"), + } + + expire, err := time.Parse(time.RFC3339, "2017-03-17T20:04:26+02:00") + assert.NoError(t, err) + msg.Expires = &expire + + // then: the serialisation is as expected + assert.Equal(t, aNormalMessage, string(msg.Encode())) + assert.Equal(t, "42: Hello World", msg.String()) // and: the first line is as expected assert.Equal(t, strings.SplitN(aNormalMessage, "\n", 2)[0], msg.Metadata()) @@ -69,7 +101,7 @@ func TestSerializeAMinimalMessage(t *testing.T) { Time: unixTime.Unix(), } - assert.Equal(t, aMinimalMessage, string(msg.Bytes())) + assert.Equal(t, aMinimalMessage, string(msg.Encode())) } func TestSerializeAMinimalMessageWithBody(t *testing.T) { @@ -80,7 +112,7 @@ func TestSerializeAMinimalMessageWithBody(t *testing.T) { Body: []byte("Hello World"), } - assert.Equal(t, aMinimalMessage+"\n\nHello World", string(msg.Bytes())) + assert.Equal(t, aMinimalMessage+"\n\nHello World", string(msg.Encode())) } func TestParsingAMinimalMessage(t *testing.T) { @@ -126,67 +158,6 @@ func TestErrorsOnParsingMessages(t *testing.T) { assert.Error(err) } -func TestParsingNotificationMessage(t *testing.T) { - assert := assert.New(t) - - msgI, err := Decode([]byte(aConnectedNotification)) - assert.NoError(err) - assert.IsType(&NotificationMessage{}, msgI) - msg := msgI.(*NotificationMessage) - - assert.Equal(SUCCESS_CONNECTED, msg.Name) - assert.Equal("You are connected to the server.", msg.Arg) - assert.Equal(`{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, msg.Json) - assert.Equal(false, msg.IsError) -} - -func TestSerializeANotificationMessage(t *testing.T) { - msg := &NotificationMessage{ - Name: SUCCESS_CONNECTED, - Arg: "You are connected to the server.", - Json: `{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, - IsError: false, - } - - assert.Equal(t, aConnectedNotification, string(msg.Bytes())) -} - -func TestSerializeAnErrorMessage(t *testing.T) { - msg := &NotificationMessage{ - Name: ERROR_BAD_REQUEST, - Arg: "you are so bad.", - IsError: true, - } - - assert.Equal(t, "!"+ERROR_BAD_REQUEST+" "+"you are so bad.", string(msg.Bytes())) -} - -func TestSerializeANotificationMessageWithEmptyArg(t *testing.T) { - msg := &NotificationMessage{ - Name: SUCCESS_SEND, - Arg: "", - IsError: false, - } - - assert.Equal(t, "#"+SUCCESS_SEND, string(msg.Bytes())) -} - -func TestParsingErrorNotificationMessage(t *testing.T) { - assert := assert.New(t) - - raw := "!bad-request unknown command 'sdcsd'" - - msgI, err := Decode([]byte(raw)) - assert.NoError(err) - assert.IsType(&NotificationMessage{}, msgI) - msg := msgI.(*NotificationMessage) - - assert.Equal("bad-request", msg.Name) - assert.Equal("unknown command 'sdcsd'", msg.Arg) - assert.Equal("", msg.Json) - assert.Equal(true, msg.IsError) -} - func Test_Message_getPartitionFromTopic(t *testing.T) { a := assert.New(t) a.Equal("foo", Path("/foo/bar/bazz").Partition()) @@ -224,3 +195,35 @@ func TestMessage_decodeFilters(t *testing.T) { a.Equal(msg.Filters["user"], "user01") a.Equal(msg.Filters["device_id"], "ID_DEVICE") } + +func TestMessage_IsExpired(t *testing.T) { + a := assert.New(t) + + n := time.Now().UTC() + loc1, err := time.LoadLocation("Europe/Berlin") + a.NoError(err) + + loc2, err := time.LoadLocation("Europe/Bucharest") + a.NoError(err) + + cases := []struct { + expires time.Time + result bool + }{ + {n.AddDate(0, 0, 1), false}, + {n.AddDate(0, 0, -1), true}, + {n.AddDate(0, 0, 1).In(loc1), false}, + {n.AddDate(0, 0, 1).In(loc2), false}, + {n.AddDate(0, 0, -1).In(loc1), true}, + {n.AddDate(0, 0, -1).In(loc2), true}, + {n.Add(1 * time.Minute).In(loc1), false}, + {n.Add(1 * time.Minute).In(loc2), false}, + {n.Add(-1 * time.Minute).In(loc1), true}, + {n.Add(-1 * time.Minute).In(loc2), true}, + } + + for i, c := range cases { + a.Equal(c.result, (&Message{Expires: &c.expires}).IsExpired(), "Failed IsExpired case: %d", i) + } + +} diff --git a/protocol/notification_message.go b/protocol/notification_message.go new file mode 100644 index 00000000..a5342b34 --- /dev/null +++ b/protocol/notification_message.go @@ -0,0 +1,83 @@ +package protocol + +import ( + "bytes" + "fmt" + "strings" +) + +// Valid constants for the NotificationMessage.Name +const ( + SUCCESS_CONNECTED = "connected" + SUCCESS_SEND = "send" + SUCCESS_FETCH_START = "fetch-start" + SUCCESS_FETCH_END = "fetch-end" + SUCCESS_SUBSCRIBED_TO = "subscribed-to" + SUCCESS_CANCELED = "canceled" + ERROR_SUBSCRIBED_TO = "error-subscribed-to" + ERROR_BAD_REQUEST = "error-bad-request" + ERROR_INTERNAL_SERVER = "error-server-internal" +) + +// NotificationMessage is a representation of a status messages or error message, sent from the server +type NotificationMessage struct { + + // The name of the message + Name string + + // The argument line, following the messageName + Arg string + + // The optional json data supplied with the message + Json string + + // Flag which indicates, if the notification is an error + IsError bool +} + +// Bytes serializes the notification message into a byte slice +func (msg *NotificationMessage) Bytes() []byte { + buff := &bytes.Buffer{} + + if msg.IsError { + buff.WriteString("!") + } else { + buff.WriteString("#") + } + buff.WriteString(msg.Name) + if len(msg.Arg) > 0 { + buff.WriteString(" ") + buff.WriteString(msg.Arg) + } + + if len(msg.Json) > 0 { + buff.WriteString("\n") + buff.WriteString(msg.Json) + } + + return buff.Bytes() +} + +func parseNotificationMessage(message []byte) (*NotificationMessage, error) { + msg := &NotificationMessage{} + + if len(message) < 2 || (message[0] != '#' && message[0] != '!') { + return nil, fmt.Errorf("message has to start with '#' or '!' and a name, but got '%v'", message) + } + msg.IsError = message[0] == '!' + + parts := strings.SplitN(string(message)[1:], "\n", 2) + firstLine := strings.SplitN(parts[0], " ", 2) + + msg.Name = firstLine[0] + + if len(firstLine) > 1 { + msg.Arg = firstLine[1] + } + + if len(parts) > 1 { + msg.Json = parts[1] + } + + return msg, nil +} diff --git a/protocol/notification_message_test.go b/protocol/notification_message_test.go new file mode 100644 index 00000000..553d513a --- /dev/null +++ b/protocol/notification_message_test.go @@ -0,0 +1,68 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParsingNotificationMessage(t *testing.T) { + assert := assert.New(t) + + msgI, err := Decode([]byte(aConnectedNotification)) + assert.NoError(err) + assert.IsType(&NotificationMessage{}, msgI) + msg := msgI.(*NotificationMessage) + + assert.Equal(SUCCESS_CONNECTED, msg.Name) + assert.Equal("You are connected to the server.", msg.Arg) + assert.Equal(`{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, msg.Json) + assert.Equal(false, msg.IsError) +} + +func TestSerializeANotificationMessage(t *testing.T) { + msg := &NotificationMessage{ + Name: SUCCESS_CONNECTED, + Arg: "You are connected to the server.", + Json: `{"ApplicationId": "phone1", "UserId": "user01", "Time": "1420110000"}`, + IsError: false, + } + + assert.Equal(t, aConnectedNotification, string(msg.Bytes())) +} + +func TestParsingErrorNotificationMessage(t *testing.T) { + assert := assert.New(t) + + raw := "!bad-request unknown command 'sdcsd'" + + msgI, err := Decode([]byte(raw)) + assert.NoError(err) + assert.IsType(&NotificationMessage{}, msgI) + msg := msgI.(*NotificationMessage) + + assert.Equal("bad-request", msg.Name) + assert.Equal("unknown command 'sdcsd'", msg.Arg) + assert.Equal("", msg.Json) + assert.Equal(true, msg.IsError) +} + +func TestSerializeAnErrorMessage(t *testing.T) { + msg := &NotificationMessage{ + Name: ERROR_BAD_REQUEST, + Arg: "you are so bad.", + IsError: true, + } + + assert.Equal(t, "!"+ERROR_BAD_REQUEST+" "+"you are so bad.", string(msg.Bytes())) +} + +func TestSerializeANotificationMessageWithEmptyArg(t *testing.T) { + msg := &NotificationMessage{ + Name: SUCCESS_SEND, + Arg: "", + IsError: false, + } + + assert.Equal(t, "#"+SUCCESS_SEND, string(msg.Bytes())) +} diff --git a/scripts/generate_mocks.sh b/scripts/generate_mocks.sh index 9336ab32..a141711e 100755 --- a/scripts/generate_mocks.sh +++ b/scripts/generate_mocks.sh @@ -48,11 +48,6 @@ $MOCKGEN -self_package router -package router \ github.com/cosminrentea/gobbler/server/kvstore \ KVStore & -$MOCKGEN -self_package router -package router \ - -destination server/router/mocks_auth_gen_test.go \ - github.com/cosminrentea/gobbler/server/auth \ - AccessManager & - $MOCKGEN -self_package router -package router \ -destination server/router/mocks_checker_gen_test.go \ github.com/docker/distribution/health \ @@ -113,11 +108,6 @@ $MOCKGEN -package server \ github.com/cosminrentea/gobbler/server/router \ Router & -$MOCKGEN -self_package server -package server \ - -destination server/mocks_auth_gen_test.go \ - github.com/cosminrentea/gobbler/server/auth \ - AccessManager & - $MOCKGEN -self_package server -package server \ -destination server/mocks_store_gen_test.go \ github.com/cosminrentea/gobbler/server/store \ @@ -128,14 +118,6 @@ $MOCKGEN -package server \ github.com/cosminrentea/gobbler/server/apns \ Pusher & -# server/auth mocks -$MOCKGEN -self_package auth -package auth \ - -destination server/auth/mocks_auth_gen_test.go \ - github.com/cosminrentea/gobbler/server/auth \ - AccessManager -replace "server/auth/mocks_auth_gen_test.go" \ - "auth \"github.com\/cosminrentea\/gobbler\/server\/auth\"" \ - "auth\." # server/connector mocks $MOCKGEN -self_package connector -package connector \ @@ -175,11 +157,6 @@ $MOCKGEN -self_package websocket -package websocket \ github.com/cosminrentea/gobbler/server/store \ MessageStore & -$MOCKGEN -self_package websocket -package websocket \ - -destination server/websocket/mocks_auth_gen_test.go \ - github.com/cosminrentea/gobbler/server/auth \ - AccessManager & - # server/rest Mocks $MOCKGEN -package rest \ -destination server/rest/mocks_router_gen_test.go \ diff --git a/server/apns/apns.go b/server/apns/apns.go index cf3e7f2b..a0be264d 100644 --- a/server/apns/apns.go +++ b/server/apns/apns.go @@ -107,6 +107,7 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface r, ok := responseIface.(*apns2.Response) if !ok { mTotalResponseErrors.Add(1) + pResponseErrors.Inc() return fmt.Errorf("Response could not be converted to an APNS Response") } messageID := request.Message().ID @@ -115,6 +116,7 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface if err := a.Manager().Update(subscriber); err != nil { logger.WithField("error", err.Error()).Error("Manager could not update subscription") mTotalResponseInternalErrors.Add(1) + pResponseInternalErrors.Inc() return err } if r.Sent() { @@ -137,6 +139,7 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface logger.WithField("id", r.ApnsID).Info("trying to remove subscriber because a relevant error was received from APNS") mTotalResponseRegistrationErrors.Add(1) + pResponseRegistrationErrors.Inc() err := a.Manager().Remove(subscriber) if err != nil { logger.WithField("id", r.ApnsID).Error("could not remove subscriber") @@ -144,6 +147,7 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface default: logger.Error("handling other APNS errors") mTotalResponseOtherErrors.Add(1) + pResponseOtherErrors.Inc() } return nil } diff --git a/server/apns/apns_prometheus.go b/server/apns/apns_prometheus.go index 9b61f4a7..633e55d2 100644 --- a/server/apns/apns_prometheus.go +++ b/server/apns/apns_prometheus.go @@ -14,11 +14,53 @@ var ( Name: "apns_send_errors", Help: "Number of errors when trying to send messages to APNS", }) + + pResponseErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_response_errors", + Help: "Number of errors received after sending messages to APNS", + }) + + pResponseInternalErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_response_internal_errors", + Help: "Number of internal errors related to handling responses from APNS", + }) + + pResponseRegistrationErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_response_registration_errors", + Help: "Number of errors related to APNS registrations", + }) + + pResponseOtherErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_response_other_errors", + Help: "Number of other APNS errors", + }) + + pSendNetworkErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_send_network_errors", + Help: "Number of errors related to network when sending to APNS", + }) + + pSendRetryCloseTLS = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_send_retry_close_tls", + Help: "Number of retries related to closing TLS in the APNS connector", + }) + + pSendRetryUnrecoverable = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "apns_send_retry_unrecoverable", + Help: "Number of unrecoverable retries in the APNS connector", + }) ) func init() { prometheus.MustRegister( pSentMessages, pSendErrors, + pResponseErrors, + pResponseInternalErrors, + pResponseRegistrationErrors, + pResponseOtherErrors, + pSendNetworkErrors, + pSendRetryCloseTLS, + pSendRetryUnrecoverable, ) } diff --git a/server/apns/apns_sender.go b/server/apns/apns_sender.go index 355fde86..67b1d931 100644 --- a/server/apns/apns_sender.go +++ b/server/apns/apns_sender.go @@ -69,10 +69,12 @@ func (s sender) Send(request connector.Request) (interface{}, error) { if closable, ok := s.client.(closable); ok { logger.Warn("Close TLS and retry again") mTotalSendRetryCloseTLS.Add(1) + pSendRetryCloseTLS.Inc() closable.CloseTLS() return push() } else { mTotalSendRetryUnrecoverable.Add(1) + pSendRetryUnrecoverable.Inc() logger.Error("Cannot Close TLS. Unrecoverable state") } } @@ -92,6 +94,7 @@ func (r *retryable) execute(op func() (interface{}, error)) (interface{}, error) // retry on network errors if _, ok := opError.(net.Error); ok { mTotalSendNetworkErrors.Add(1) + pSendNetworkErrors.Inc() if tryCounter >= r.maxTries { return "", ErrRetryFailed } diff --git a/server/apns/mocks_router_gen_test.go b/server/apns/mocks_router_gen_test.go index 33a666dd..f935254f 100644 --- a/server/apns/mocks_router_gen_test.go +++ b/server/apns/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package apns import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/auth/accessmanager.go b/server/auth/accessmanager.go deleted file mode 100644 index 0d2c700b..00000000 --- a/server/auth/accessmanager.go +++ /dev/null @@ -1,21 +0,0 @@ -package auth - -import ( - "github.com/cosminrentea/gobbler/protocol" -) - -// AccessType permission required by the user -type AccessType int - -const ( - // READ permission - READ AccessType = iota - - // WRITE permission - WRITE -) - -// AccessManager interface allows to provide a custom authentication mechanism -type AccessManager interface { - IsAllowed(accessType AccessType, userID string, path protocol.Path) bool -} diff --git a/server/auth/accessmanager_test.go b/server/auth/accessmanager_test.go deleted file mode 100644 index 949f1db5..00000000 --- a/server/auth/accessmanager_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package auth - -import ( - "github.com/stretchr/testify/assert" - "net/http" - "net/http/httptest" - "testing" -) - -func Test_AllowAllAccessManager(t *testing.T) { - a := assert.New(t) - am := AccessManager(NewAllowAllAccessManager(true)) - a.True(am.IsAllowed(READ, "userid", "/path")) - - am = AccessManager(NewAllowAllAccessManager(false)) - a.False(am.IsAllowed(READ, "userid", "/path")) -} - -func Test_RestAccessManagerAllowed(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("true")) - })) - - defer ts.Close() - a := assert.New(t) - am := NewRestAccessManager(ts.URL) - a.True(am.IsAllowed(READ, "foo", "/foo")) - a.True(am.IsAllowed(WRITE, "foo", "/foo")) -} - -func Test_RestAccessManagerNotAllowed(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("false")) - })) - - defer ts.Close() - am := NewRestAccessManager(ts.URL) - a := assert.New(t) - a.False(am.IsAllowed(READ, "user", "/foo")) -} - -func Test_RestAccessManagerNotAllowedWithServerNotStarted(t *testing.T) { - ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("false")) - })) - - defer ts.Close() - am := NewRestAccessManager(ts.URL) - a := assert.New(t) - a.False(am.IsAllowed(READ, "user", "/foo")) -} - -func Test_RestAccessManagerNotAllowedHttpReturningStatusForbidden(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusForbidden) - })) - - defer ts.Close() - a := assert.New(t) - am := NewRestAccessManager(ts.URL) - a.False(am.IsAllowed(READ, "foo", "/foo")) - a.False(am.IsAllowed(WRITE, "foo", "/foo")) -} diff --git a/server/auth/allow_all_accessmanager.go b/server/auth/allow_all_accessmanager.go deleted file mode 100644 index aa057efb..00000000 --- a/server/auth/allow_all_accessmanager.go +++ /dev/null @@ -1,18 +0,0 @@ -package auth - -import ( - "github.com/cosminrentea/gobbler/protocol" -) - -//AllowAllAccessManager is a dummy implementation that grants access for everything. -type AllowAllAccessManager bool - -//NewAllowAllAccessManager returns a new AllowAllAccessManager (depending on the passed parameter, always true or always false) -func NewAllowAllAccessManager(allowAll bool) AllowAllAccessManager { - return AllowAllAccessManager(allowAll) -} - -//IsAllowed returns always the same value, given at construction time (true or false). -func (am AllowAllAccessManager) IsAllowed(accessType AccessType, userID string, path protocol.Path) bool { - return bool(am) -} diff --git a/server/auth/logger.go b/server/auth/logger.go deleted file mode 100644 index fe1b963b..00000000 --- a/server/auth/logger.go +++ /dev/null @@ -1,9 +0,0 @@ -package auth - -import ( - log "github.com/Sirupsen/logrus" -) - -var logger = log.WithFields(log.Fields{ - "module": "accessManager", -}) diff --git a/server/auth/mocks_auth_gen_test.go b/server/auth/mocks_auth_gen_test.go deleted file mode 100644 index 715c66ee..00000000 --- a/server/auth/mocks_auth_gen_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Automatically generated by MockGen. DO NOT EDIT! -// Source: github.com/cosminrentea/gobbler/server/auth (interfaces: AccessManager) - -package auth - -import ( - protocol "github.com/cosminrentea/gobbler/protocol" - - gomock "github.com/golang/mock/gomock" -) - -// Mock of AccessManager interface -type MockAccessManager struct { - ctrl *gomock.Controller - recorder *_MockAccessManagerRecorder -} - -// Recorder for MockAccessManager (not exported) -type _MockAccessManagerRecorder struct { - mock *MockAccessManager -} - -func NewMockAccessManager(ctrl *gomock.Controller) *MockAccessManager { - mock := &MockAccessManager{ctrl: ctrl} - mock.recorder = &_MockAccessManagerRecorder{mock} - return mock -} - -func (_m *MockAccessManager) EXPECT() *_MockAccessManagerRecorder { - return _m.recorder -} - -func (_m *MockAccessManager) IsAllowed(_param0 AccessType, _param1 string, _param2 protocol.Path) bool { - ret := _m.ctrl.Call(_m, "IsAllowed", _param0, _param1, _param2) - ret0, _ := ret[0].(bool) - return ret0 -} - -func (_mr *_MockAccessManagerRecorder) IsAllowed(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "IsAllowed", arg0, arg1, arg2) -} diff --git a/server/auth/rest_accessmanager.go b/server/auth/rest_accessmanager.go deleted file mode 100644 index 7defb99b..00000000 --- a/server/auth/rest_accessmanager.go +++ /dev/null @@ -1,57 +0,0 @@ -package auth - -import ( - "github.com/cosminrentea/gobbler/protocol" - - log "github.com/Sirupsen/logrus" - - "io/ioutil" - "net/http" - "net/url" -) - -// RestAccessManager is a url for which the access is allowed or not. -type RestAccessManager string - -// NewRestAccessManager returns a new RestAccessManager. -func NewRestAccessManager(url string) RestAccessManager { - return RestAccessManager(url) -} - -// IsAllowed is an implementation of the AccessManager interface. -// The boolean result is based on matching between the desired AccessType, the userId and the path. -func (ram RestAccessManager) IsAllowed(accessType AccessType, userId string, path protocol.Path) bool { - - u, _ := url.Parse(string(ram)) - q := u.Query() - if accessType == READ { - q.Set("type", "read") - } else { - q.Set("type", "write") - } - - q.Set("userId", userId) - q.Set("path", string(path)) - - resp, err := http.DefaultClient.Get(u.String()) - - if err != nil { - logger.WithError(err).WithField("module", "RestAccessManager").Warn("Write message failed") - return false - } - defer resp.Body.Close() - responseBody, err := ioutil.ReadAll(resp.Body) - - if err != nil || resp.StatusCode != 200 { - logger.WithError(err).WithField("httpCode", resp.StatusCode).Info("Error getting permission") - logger.WithField("responseBody", responseBody).Debug("HTTP Response Body") - return false - } - logger.WithFields(log.Fields{ - "access_type": accessType, - "userId": userId, - "path": path, - "responseBody": string(responseBody), - }).Debug("Access allowed") - return "true" == string(responseBody) -} diff --git a/server/benchmarking_fetch_test.go b/server/benchmarking_fetch_test.go index 33768d32..016d5741 100644 --- a/server/benchmarking_fetch_test.go +++ b/server/benchmarking_fetch_test.go @@ -54,7 +54,7 @@ func Benchmark_E2E_Fetch_HelloWorld_Messages(b *testing.B) { for i := 1; i <= b.N; i++ { select { case msg := <-c.Messages(): - a.Equal(fmt.Sprintf("Hello %v", i), msg.BodyAsString()) + a.Equal(fmt.Sprintf("Hello %v", i), string(msg.Body)) case e := <-c.Errors(): a.Fail(string(e.Bytes())) return diff --git a/server/benchmarking_test.go b/server/benchmarking_test.go index f3a58850..76a1140f 100644 --- a/server/benchmarking_test.go +++ b/server/benchmarking_test.go @@ -159,7 +159,7 @@ func (tg *testgroup) Start() { select { case msg := <-tg.consumer.Messages(): assert.Equal(tg.t, tg.topic, string(msg.Path)) - if !assert.Equal(tg.t, body, msg.BodyAsString()) { + if !assert.Equal(tg.t, body, string(msg.Body)) { tg.t.FailNow() tg.done <- false } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3df76d85..fcca9a83 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -169,7 +169,7 @@ func (cluster *Cluster) BroadcastMessage(pMessage *protocol.Message) error { cMessage := &message{ NodeID: cluster.Config.ID, Type: mtGubleMessage, - Body: pMessage.Bytes(), + Body: pMessage.Encode(), } return cluster.broadcastClusterMessage(cMessage) } diff --git a/server/cluster_integration_test.go b/server/cluster_integration_test.go index 8938348a..6be0b97c 100644 --- a/server/cluster_integration_test.go +++ b/server/cluster_integration_test.go @@ -1,12 +1,13 @@ package server import ( + "testing" + "time" + log "github.com/Sirupsen/logrus" "github.com/cosminrentea/gobbler/protocol" "github.com/cosminrentea/gobbler/testutil" "github.com/stretchr/testify/assert" - "testing" - "time" ) func Test_Cluster_Subscribe_To_Random_Node(t *testing.T) { @@ -109,12 +110,12 @@ WAIT: "path": incomingMessage.Path, "incomingMsgUserId": incomingMessage.UserID, "headerJson": incomingMessage.HeaderJSON, - "body": incomingMessage.BodyAsString(), + "body": string(incomingMessage.Body), "numReceived": numReceived, }).Info("Client2 received a message") a.Equal(protocol.Path("/testTopic/m"), incomingMessage.Path) - a.Equal("body", incomingMessage.BodyAsString()) + a.Equal("body", string(incomingMessage.Body)) a.True(incomingMessage.ID > 0) idReceived[incomingMessage.ID] = true diff --git a/server/connector/mocks_router_gen_test.go b/server/connector/mocks_router_gen_test.go index f0d00dd0..36e82db5 100644 --- a/server/connector/mocks_router_gen_test.go +++ b/server/connector/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package connector import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/fcm/fcm.go b/server/fcm/fcm.go index 26b9e82c..d35b88fc 100644 --- a/server/fcm/fcm.go +++ b/server/fcm/fcm.go @@ -87,6 +87,7 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{ if err != nil && !isValidResponseError(err) { logger.WithField("error", err.Error()).Error("Error sending message to FCM") mTotalSendErrors.Add(1) + pSendErrors.Inc() if *f.IntervalMetrics && metadata != nil { addToLatenciesAndCountsMaps(currentTotalErrorsLatenciesKey, currentTotalErrorsKey, metadata.Latency) } @@ -98,6 +99,7 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{ response, ok := responseIface.(*gcm.Response) if !ok { mTotalResponseErrors.Add(1) + pResponseErrors.Inc() return fmt.Errorf("Invalid FCM Response") } @@ -110,6 +112,7 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{ } if response.Ok() { mTotalSentMessages.Add(1) + pSent.Inc() if *f.IntervalMetrics && metadata != nil { addToLatenciesAndCountsMaps(currentTotalMessagesLatenciesKey, currentTotalMessagesKey, metadata.Latency) } @@ -123,6 +126,7 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{ logger.Debug("Removing not registered FCM subscription") f.Manager().Remove(subscriber) mTotalResponseNotRegisteredErrors.Add(1) + pResponseNotRegisteredErrors.Inc() return response.Error case "InvalidRegistration": logger.WithField("jsonError", errText).Error("InvalidRegistration of FCM subscription") @@ -132,10 +136,12 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{ if response.CanonicalIDs != 0 { mTotalReplacedCanonicalErrors.Add(1) + pResponseReplacedCanonicalErrors.Inc() // we only send to one receiver, so we know that we can replace the old id with the first registration id (=canonical id) return f.replaceCanonical(request.Subscriber(), response.Results[0].RegistrationID) } mTotalResponseOtherErrors.Add(1) + pResponseOtherErrors.Inc() return nil } diff --git a/server/fcm/fcm_prometheus.go b/server/fcm/fcm_prometheus.go new file mode 100644 index 00000000..d88e7fdb --- /dev/null +++ b/server/fcm/fcm_prometheus.go @@ -0,0 +1,54 @@ +package fcm + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + pSent = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_sent", + Help: "Number of messages sent to FCM", + }) + + pSendErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_send_errors", + Help: "Number of FCM errors when trying to send", + }) + + pResponseErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_response_errors", + Help: "Number of FCM errors received as responses", + }) + + pResponseInternalErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_response_internal_errors", + Help: "Number of internal errors related to FCM responses", + }) + + pResponseNotRegisteredErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_response_not_registered_errors", + Help: "Number of errors related to not registered states in FCM connector", + }) + + pResponseReplacedCanonicalErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_response_replaced_canonical_errors", + Help: "Number of errors related to canonical IDs in FCM connector", + }) + + pResponseOtherErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "fcm_response_other_errors", + Help: "Number of other errors related to responses in the FCM connector", + }) +) + +func init() { + prometheus.MustRegister( + pSent, + pSendErrors, + pResponseErrors, + pResponseInternalErrors, + pResponseNotRegisteredErrors, + pResponseReplacedCanonicalErrors, + pResponseOtherErrors, + ) +} diff --git a/server/fcm/mocks_router_gen_test.go b/server/fcm/mocks_router_gen_test.go index f9ad3755..078e3ea5 100644 --- a/server/fcm/mocks_router_gen_test.go +++ b/server/fcm/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package fcm import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/gobbler.go b/server/gobbler.go index 729f803b..0bf9c71c 100644 --- a/server/gobbler.go +++ b/server/gobbler.go @@ -6,7 +6,6 @@ import ( "github.com/cosminrentea/gobbler/logformatter" "github.com/cosminrentea/gobbler/protocol" "github.com/cosminrentea/gobbler/server/apns" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/cluster" "github.com/cosminrentea/gobbler/server/fcm" "github.com/cosminrentea/gobbler/server/kvstore" @@ -59,12 +58,6 @@ var ValidateStoragePath = func() error { return nil } -// CreateAccessManager is a func which returns a auth.AccessManager implementation -// (currently: AllowAllAccessManager). -var CreateAccessManager = func() auth.AccessManager { - return auth.NewAllowAllAccessManager(true) -} - // CreateKVStore is a func which returns a kvstore.KVStore implementation // (currently, based on guble configuration). var CreateKVStore = func() kvstore.KVStore { @@ -255,7 +248,6 @@ func Main() { func StartService() *service.Service { //TODO StartService could return an error in case it fails to start - accessManager := CreateAccessManager() messageStore := CreateMessageStore() kvStore := CreateKVStore() @@ -277,7 +269,7 @@ func StartService() *service.Service { logger.Info("Starting in standalone-mode") } - r := router.New(accessManager, messageStore, kvStore, cl) + r := router.New(messageStore, kvStore, cl) websrv := webserver.New(*Config.HttpListen) srv := service.New(r, websrv). diff --git a/server/gobbler_test.go b/server/gobbler_test.go index a86ca856..e988cbe9 100644 --- a/server/gobbler_test.go +++ b/server/gobbler_test.go @@ -138,10 +138,8 @@ func TestStartServiceModules(t *testing.T) { func initRouterMock() *MockRouter { routerMock := NewMockRouter(testutil.MockCtrl) routerMock.EXPECT().Cluster().Return(nil).AnyTimes() - amMock := NewMockAccessManager(testutil.MockCtrl) msMock := NewMockMessageStore(testutil.MockCtrl) - routerMock.EXPECT().AccessManager().Return(amMock, nil).AnyTimes() routerMock.EXPECT().MessageStore().Return(msMock, nil).AnyTimes() return routerMock diff --git a/server/integration_test.go b/server/integration_test.go index 280d6c8c..8dce10a5 100644 --- a/server/integration_test.go +++ b/server/integration_test.go @@ -84,7 +84,6 @@ type Subscriber struct { } func TestSubscribersIntegration(t *testing.T) { - testutil.SkipIfShort(t) testutil.SkipIfDisabled(t) defer testutil.ResetDefaultRegistryHealthCheck() diff --git a/server/kvstore/postgres_test.go b/server/kvstore/postgres_test.go index 073c279d..a2ba9f4e 100644 --- a/server/kvstore/postgres_test.go +++ b/server/kvstore/postgres_test.go @@ -1,8 +1,11 @@ package kvstore import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/cosminrentea/gobbler/testutil" + + "github.com/stretchr/testify/assert" ) func BenchmarkPostgresKVStore_PutGet(b *testing.B) { @@ -12,24 +15,28 @@ func BenchmarkPostgresKVStore_PutGet(b *testing.B) { } func TestPostgresKVStore_PutGetDelete(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(aPostgresConfig()) kvs.Open() CommonTestPutGetDelete(t, kvs, kvs) } func TestPostgresKVStore_Iterate(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(aPostgresConfig()) kvs.Open() CommonTestIterate(t, kvs, kvs) } func TestPostgresKVStore_IterateKeys(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(aPostgresConfig()) kvs.Open() CommonTestIterateKeys(t, kvs, kvs) } func TestPostgresKVStore_Check(t *testing.T) { + testutil.SkipIfShort(t) a := assert.New(t) kvs := NewPostgresKVStore(aPostgresConfig()) @@ -45,6 +52,7 @@ func TestPostgresKVStore_Check(t *testing.T) { } func TestPostgresKVStore_Open(t *testing.T) { + testutil.SkipIfShort(t) kvs := NewPostgresKVStore(invalidPostgresConfig()) err := kvs.Open() assert.NotNil(t, err) diff --git a/server/mocks_auth_gen_test.go b/server/mocks_auth_gen_test.go deleted file mode 100644 index 3f045c6d..00000000 --- a/server/mocks_auth_gen_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Automatically generated by MockGen. DO NOT EDIT! -// Source: github.com/cosminrentea/gobbler/server/auth (interfaces: AccessManager) - -package server - -import ( - protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" - gomock "github.com/golang/mock/gomock" -) - -// Mock of AccessManager interface -type MockAccessManager struct { - ctrl *gomock.Controller - recorder *_MockAccessManagerRecorder -} - -// Recorder for MockAccessManager (not exported) -type _MockAccessManagerRecorder struct { - mock *MockAccessManager -} - -func NewMockAccessManager(ctrl *gomock.Controller) *MockAccessManager { - mock := &MockAccessManager{ctrl: ctrl} - mock.recorder = &_MockAccessManagerRecorder{mock} - return mock -} - -func (_m *MockAccessManager) EXPECT() *_MockAccessManagerRecorder { - return _m.recorder -} - -func (_m *MockAccessManager) IsAllowed(_param0 auth.AccessType, _param1 string, _param2 protocol.Path) bool { - ret := _m.ctrl.Call(_m, "IsAllowed", _param0, _param1, _param2) - ret0, _ := ret[0].(bool) - return ret0 -} - -func (_mr *_MockAccessManagerRecorder) IsAllowed(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "IsAllowed", arg0, arg1, arg2) -} diff --git a/server/mocks_router_gen_test.go b/server/mocks_router_gen_test.go index f3bc02e8..1453c854 100644 --- a/server/mocks_router_gen_test.go +++ b/server/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package server import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/redundancy_test.go b/server/redundancy_test.go index 31ffcedb..6b89f532 100644 --- a/server/redundancy_test.go +++ b/server/redundancy_test.go @@ -10,6 +10,7 @@ import ( ) func Test_Subscribe_on_random_node(t *testing.T) { + testutil.SkipIfDisabled(t) testutil.SkipIfShort(t) a := assert.New(t) diff --git a/server/rest/mocks_router_gen_test.go b/server/rest/mocks_router_gen_test.go index 3d8c8e42..ff913206 100644 --- a/server/rest/mocks_router_gen_test.go +++ b/server/rest/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package rest import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/router/errors.go b/server/router/errors.go index 8c96edcd..3dffd11d 100644 --- a/server/router/errors.go +++ b/server/router/errors.go @@ -1,9 +1,6 @@ package router import ( - "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" - "errors" "fmt" ) @@ -24,23 +21,6 @@ var ( ErrQueueFull = errors.New("Route queue is full. Route is closed.") ) -// PermissionDeniedError is returned when AccessManager denies a user request for a topic -type PermissionDeniedError struct { - - // userId of request - UserID string - - // accessType requested(READ/WRITE) - AccessType auth.AccessType - - // requested topic - Path protocol.Path -} - -func (e *PermissionDeniedError) Error() string { - return fmt.Sprintf("Access Denied for user=[%s] on path=[%s] for Operation=[%s]", e.UserID, e.Path, e.AccessType) -} - // ModuleStoppingError is returned when the module is stopping type ModuleStoppingError struct { Name string diff --git a/server/router/mocks_auth_gen_test.go b/server/router/mocks_auth_gen_test.go deleted file mode 100644 index 1d655a80..00000000 --- a/server/router/mocks_auth_gen_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Automatically generated by MockGen. DO NOT EDIT! -// Source: github.com/cosminrentea/gobbler/server/auth (interfaces: AccessManager) - -package router - -import ( - protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" - gomock "github.com/golang/mock/gomock" -) - -// Mock of AccessManager interface -type MockAccessManager struct { - ctrl *gomock.Controller - recorder *_MockAccessManagerRecorder -} - -// Recorder for MockAccessManager (not exported) -type _MockAccessManagerRecorder struct { - mock *MockAccessManager -} - -func NewMockAccessManager(ctrl *gomock.Controller) *MockAccessManager { - mock := &MockAccessManager{ctrl: ctrl} - mock.recorder = &_MockAccessManagerRecorder{mock} - return mock -} - -func (_m *MockAccessManager) EXPECT() *_MockAccessManagerRecorder { - return _m.recorder -} - -func (_m *MockAccessManager) IsAllowed(_param0 auth.AccessType, _param1 string, _param2 protocol.Path) bool { - ret := _m.ctrl.Call(_m, "IsAllowed", _param0, _param1, _param2) - ret0, _ := ret[0].(bool) - return ret0 -} - -func (_mr *_MockAccessManagerRecorder) IsAllowed(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "IsAllowed", arg0, arg1, arg2) -} diff --git a/server/router/mocks_router_gen_test.go b/server/router/mocks_router_gen_test.go index a7452118..59e6c568 100644 --- a/server/router/mocks_router_gen_test.go +++ b/server/router/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package router import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/router/route_test.go b/server/router/route_test.go index 7b1021cd..f1add356 100644 --- a/server/router/route_test.go +++ b/server/router/route_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/kvstore" "github.com/cosminrentea/gobbler/server/store" "github.com/cosminrentea/gobbler/testutil" @@ -19,7 +18,7 @@ import ( var ( dummyPath = protocol.Path("/dummy") dummyMessageWithID = &protocol.Message{ID: 1, Path: dummyPath, Body: []byte("dummy body")} - dummyMessageBytes = `/dummy,MESSAGE_ID,user01,phone01,{},1420110000,1 + dummyMessageBytes = `/dummy,MESSAGE_ID,user01,phone01,{},,1420110000,1 {"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"} Hello World` chanSize = 10 @@ -407,7 +406,7 @@ func TestRoute_Provide_MultipleFetch(t *testing.T) { memoryKV := kvstore.NewMemoryKVStore() msMock := NewMockMessageStore(ctrl) - router := New(auth.AllowAllAccessManager(true), msMock, memoryKV, nil) + router := New(msMock, memoryKV, nil) if startable, ok := router.(startable); ok { startable.Start() diff --git a/server/router/router.go b/server/router/router.go index 1dd2b0fb..05bfedb8 100644 --- a/server/router/router.go +++ b/server/router/router.go @@ -14,7 +14,6 @@ import ( "net/http" "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/cluster" "github.com/cosminrentea/gobbler/server/kvstore" "github.com/cosminrentea/gobbler/server/store" @@ -36,7 +35,6 @@ type Router interface { Fetch(*store.FetchRequest) error GetSubscribers(topic string) ([]byte, error) - AccessManager() (auth.AccessManager, error) MessageStore() (store.MessageStore, error) KVStore() (kvstore.KVStore, error) Cluster() *cluster.Cluster @@ -59,16 +57,15 @@ type router struct { stopping bool // Flag: the router is in stopping process and no incoming messages are accepted wg sync.WaitGroup // Add any operation that we need to wait upon here - accessManager auth.AccessManager - messageStore store.MessageStore - kvStore kvstore.KVStore - cluster *cluster.Cluster + messageStore store.MessageStore + kvStore kvstore.KVStore + cluster *cluster.Cluster sync.RWMutex } // New returns a pointer to Router -func New(accessManager auth.AccessManager, messageStore store.MessageStore, kvStore kvstore.KVStore, cluster *cluster.Cluster) Router { +func New(messageStore store.MessageStore, kvStore kvstore.KVStore, cluster *cluster.Cluster) Router { return &router{ routes: make(map[protocol.Path][]*Route), @@ -77,10 +74,9 @@ func New(accessManager auth.AccessManager, messageStore store.MessageStore, kvSt unsubscribeC: make(chan subRequest, unsubscribeChannelCapacity), stopC: make(chan bool, 1), - accessManager: accessManager, - messageStore: messageStore, - kvStore: kvStore, - cluster: cluster, + messageStore: messageStore, + kvStore: kvStore, + cluster: cluster, } } @@ -133,7 +129,7 @@ func (router *router) Stop() error { } func (router *router) Check() error { - if router.accessManager == nil || router.messageStore == nil || router.kvStore == nil { + if router.messageStore == nil || router.kvStore == nil { logger.WithError(ErrServiceNotProvided).Error("Some mandatory services are not provided") return ErrServiceNotProvided } @@ -167,16 +163,12 @@ func (router *router) HandleMessage(message *protocol.Message) error { return err } - if !router.accessManager.IsAllowed(auth.WRITE, message.UserID, message.Path) { - return &PermissionDeniedError{UserID: message.UserID, AccessType: auth.WRITE, Path: message.Path} - } - var nodeID uint8 if router.cluster != nil { nodeID = router.cluster.Config.ID } - mTotalMessagesIncomingBytes.Add(int64(len(message.Bytes()))) + mTotalMessagesIncomingBytes.Add(int64(len(message.Encode()))) size, err := router.messageStore.StoreMessage(message, nodeID) if err != nil { logger.WithField("error", err.Error()).Error("Error storing message") @@ -197,22 +189,12 @@ func (router *router) HandleMessage(message *protocol.Message) error { } func (router *router) Subscribe(r *Route) (*Route, error) { - logger.WithFields(log.Fields{ - "accessManager": router.accessManager, - "route": r, - }).Debug("Subscribe") + logger.WithField("route", r).Debug("Subscribe") if err := router.isStopping(); err != nil { return nil, err } - userID := r.Get("user_id") - routePath := r.Path - - accessAllowed := router.accessManager.IsAllowed(auth.READ, userID, routePath) - if !accessAllowed { - return r, &PermissionDeniedError{UserID: userID, AccessType: auth.READ, Path: routePath} - } req := subRequest{ route: r, doneC: make(chan bool), @@ -225,10 +207,7 @@ func (router *router) Subscribe(r *Route) (*Route, error) { // Subscribe adds a route to the subscribers. If there is already a route with same Application Id and Path, it will be replaced. func (router *router) Unsubscribe(r *Route) { - logger.WithFields(log.Fields{ - "accessManager": router.accessManager, - "route": r, - }).Debug("Unsubscribe") + logger.WithField("route", r).Debug("Unsubscribe") req := subRequest{ route: r, @@ -303,9 +282,9 @@ func (router *router) unsubscribe(r *Route) { } func (router *router) panicIfInternalDependenciesAreNil() { - if router.accessManager == nil || router.kvStore == nil || router.messageStore == nil { - panic(fmt.Sprintf("router: the internal dependencies marked with `true` are not set: AccessManager=%v, KVStore=%v, MessageStore=%v", - router.accessManager == nil, router.kvStore == nil, router.messageStore == nil)) + if router.kvStore == nil || router.messageStore == nil { + panic(fmt.Sprintf("router: the internal dependencies marked with `true` are not set: KVStore=%v, MessageStore=%v", + router.kvStore == nil, router.messageStore == nil)) } } @@ -419,14 +398,6 @@ func (router *router) Fetch(req *store.FetchRequest) error { return nil } -// AccessManager returns the `accessManager` provided for the router -func (router *router) AccessManager() (auth.AccessManager, error) { - if router.accessManager == nil { - return nil, ErrServiceNotProvided - } - return router.accessManager, nil -} - // MessageStore returns the `messageStore` provided for the router func (router *router) MessageStore() (store.MessageStore, error) { if router.messageStore == nil { diff --git a/server/router/router_test.go b/server/router/router_test.go index 606c26c6..78b5637a 100644 --- a/server/router/router_test.go +++ b/server/router/router_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/kvstore" "github.com/cosminrentea/gobbler/server/store" "github.com/cosminrentea/gobbler/server/store/dummystore" @@ -46,7 +45,7 @@ func TestRouter_AddAndRemoveRoutes(t *testing.T) { a := assert.New(t) // Given a Router - router, _, _, _ := aStartedRouter() + router, _, _ := aStartedRouter() // when i add two routes in the same path routeBlah1, _ := router.Subscribe(NewRoute( @@ -94,102 +93,11 @@ func TestRouter_AddAndRemoveRoutes(t *testing.T) { a.Nil(router.routes[protocol.Path("/foo")]) } -func TestRouter_SubscribeNotAllowed(t *testing.T) { - ctrl, finish := testutil.NewMockCtrl(t) - defer finish() - a := assert.New(t) - - am := NewMockAccessManager(ctrl) - msMock := NewMockMessageStore(ctrl) - kvsMock := NewMockKVStore(ctrl) - - am.EXPECT().IsAllowed(auth.READ, "user01", protocol.Path("/blah")).Return(false) - - router := New(am, msMock, kvsMock, nil).(*router) - router.Start() - - _, e := router.Subscribe(NewRoute( - RouteConfig{ - RouteParams: RouteParams{"application_id": "appid01", "user_id": "user01"}, - Path: protocol.Path("/blah"), - ChannelSize: chanSize, - }, - )) - - // default TestAccessManager denies all - a.NotNil(e) - - // now add permissions - am.EXPECT().IsAllowed(auth.READ, "user01", protocol.Path("/blah")).Return(true) - - // and user shall be allowed to subscribe - _, e = router.Subscribe(NewRoute( - RouteConfig{ - RouteParams: RouteParams{"application_id": "appid01", "user_id": "user01"}, - Path: protocol.Path("/blah"), - ChannelSize: chanSize, - }, - )) - - a.Nil(e) -} - -func TestRouter_HandleMessageNotAllowed(t *testing.T) { - ctrl, finish := testutil.NewMockCtrl(t) - defer finish() - a := assert.New(t) - - amMock := NewMockAccessManager(ctrl) - msMock := NewMockMessageStore(ctrl) - kvsMock := NewMockKVStore(ctrl) - - // Given a Router with route - router, r := aRouterRoute(chanSize) - router.accessManager = amMock - router.messageStore = msMock - router.kvStore = kvsMock - - amMock.EXPECT().IsAllowed(auth.WRITE, r.Get("user_id"), r.Path).Return(false) - - // when i send a message to the route - err := router.HandleMessage(&protocol.Message{ - Path: r.Path, - Body: aTestByteMessage, - UserID: r.Get("user_id"), - }) - - // an error shall be returned - a.Error(err) - - // and when permission is granted - id, ts := uint64(2), time.Now().Unix() - - amMock.EXPECT().IsAllowed(auth.WRITE, r.Get("user_id"), r.Path).Return(true) - msMock.EXPECT(). - StoreMessage(gomock.Any(), gomock.Any()). - Do(func(m *protocol.Message, nodeID uint8) (int, error) { - m.ID = id - m.Time = ts - m.NodeID = nodeID - return len(m.Bytes()), nil - }) - - // sending message - err = router.HandleMessage(&protocol.Message{ - Path: r.Path, - Body: aTestByteMessage, - UserID: r.Get("user_id"), - }) - - // shall give no error - a.NoError(err) -} - func TestRouter_ReplacingOfRoutesMatchingAppID(t *testing.T) { a := assert.New(t) // Given a Router with a route - router, _, _, _ := aStartedRouter() + router, _, _ := aStartedRouter() matcherFunc := func(route, other RouteConfig, keys ...string) bool { return route.Path == other.Path && route.Get("application_id") == other.Get("application_id") @@ -234,7 +142,7 @@ func TestRouter_SimpleMessageSending(t *testing.T) { m.ID = id m.Time = ts m.NodeID = nodeID - return len(m.Bytes()), nil + return len(m.Encode()), nil }) // when i send a message to the route @@ -250,7 +158,7 @@ func TestRouter_RoutingWithSubTopics(t *testing.T) { a := assert.New(t) // Given a Router with route - router, _, _, _ := aStartedRouter() + router, _, _ := aStartedRouter() msMock := NewMockMessageStore(ctrl) router.messageStore = msMock @@ -388,7 +296,7 @@ func TestRouter_CleanShutdown(t *testing.T) { }). AnyTimes() - router, _, _, _ := aStartedRouter() + router, _, _ := aStartedRouter() router.messageStore = msMock route, err := router.Subscribe(NewRoute( @@ -454,38 +362,28 @@ func TestRouter_Check(t *testing.T) { defer finish() a := assert.New(t) - amMock := NewMockAccessManager(ctrl) msMock := NewMockMessageStore(ctrl) kvsMock := NewMockKVStore(ctrl) msCheckerMock := newMSChecker() kvsCheckerMock := newKVSChecker() // Given a Multiplexer with route - router, _, _, _ := aStartedRouter() + router, _, _ := aStartedRouter() // Test 0: Router is healthy by default a.Nil(router.Check()) - // Test 1a: Given accessManager is nil, then router's Check returns error - router.accessManager = nil - router.messageStore = msMock - router.kvStore = kvsMock - a.NotNil(router.Check()) - // Test 1b: Given messageStore is nil, then router's Check returns error - router.accessManager = amMock router.messageStore = nil router.kvStore = kvsMock a.NotNil(router.Check()) // Test 1c: Given kvStore is nil, then router's Check return error - router.accessManager = amMock router.messageStore = msMock router.kvStore = nil a.NotNil(router.Check()) // Test 2: Given mocked store dependencies, both healthy - router.accessManager = amMock router.messageStore = msCheckerMock router.kvStore = kvsCheckerMock @@ -510,21 +408,20 @@ func TestRouter_Check(t *testing.T) { func TestPanicOnInternalDependencies(t *testing.T) { defer testutil.ExpectPanic(t) - router := New(nil, nil, nil, nil).(*router) + router := New(nil, nil, nil).(*router) router.panicIfInternalDependenciesAreNil() } -func aStartedRouter() (*router, auth.AccessManager, store.MessageStore, kvstore.KVStore) { - am := auth.NewAllowAllAccessManager(true) +func aStartedRouter() (*router, store.MessageStore, kvstore.KVStore) { kvs := kvstore.NewMemoryKVStore() ms := dummystore.New(kvs) - router := New(am, ms, kvs, nil).(*router) + router := New(ms, kvs, nil).(*router) router.Start() - return router, am, ms, kvs + return router, ms, kvs } func aRouterRoute(unused int) (*router, *Route) { - router, _, _, _ := aStartedRouter() + router, _, _ := aStartedRouter() route, _ := router.Subscribe(NewRoute( RouteConfig{ RouteParams: RouteParams{"application_id": "appid01", "user_id": "user01"}, diff --git a/server/service/mocks_router_gen_test.go b/server/service/mocks_router_gen_test.go index b823b621..eda9f818 100644 --- a/server/service/mocks_router_gen_test.go +++ b/server/service/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package service import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/sms/mocks_router_gen_test.go b/server/sms/mocks_router_gen_test.go index 41eb1c1b..2bf08add 100644 --- a/server/sms/mocks_router_gen_test.go +++ b/server/sms/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package sms import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/sms/nexmo_sms_sender.go b/server/sms/nexmo_sms_sender.go index bc5701f9..ae09e786 100644 --- a/server/sms/nexmo_sms_sender.go +++ b/server/sms/nexmo_sms_sender.go @@ -210,6 +210,7 @@ func (ns *NexmoSender) sendSms(sms *NexmoSms) (*NexmoMessageResponse, error) { logger.WithField("error", err.Error()).Error("Error doing the request to nexmo endpoint") ns.createHttpClient() mTotalSendErrors.Add(1) + pNexmoSendErrors.Inc() return nil, ErrHTTPClientError } defer resp.Body.Close() @@ -219,6 +220,7 @@ func (ns *NexmoSender) sendSms(sms *NexmoSms) (*NexmoMessageResponse, error) { if err != nil { logger.WithField("error", err.Error()).Error("Error reading the nexmo body response") mTotalResponseInternalErrors.Add(1) + pNexmoResponseInternalErrors.Inc() return nil, ErrSMSResponseDecodingFailed } @@ -226,6 +228,7 @@ func (ns *NexmoSender) sendSms(sms *NexmoSms) (*NexmoMessageResponse, error) { if err != nil { logger.WithField("error", err.Error()).Error("Error decoding the response from nexmo endpoint") mTotalResponseInternalErrors.Add(1) + pNexmoResponseInternalErrors.Inc() return nil, ErrSMSResponseDecodingFailed } logger.WithField("messageResponse", messageResponse).WithField("order_id", sms.ClientRef).Info("Actual nexmo response") diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 8a78af44..ffd80660 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -189,10 +189,12 @@ func (g *gateway) send(receivedMsg *protocol.Message) error { if err != nil { log.WithField("error", err.Error()).Error("Sending of message failed") mTotalResponseErrors.Add(1) + pNexmoResponseErrors.Inc() return err } - mTotalSentMessages.Add(1) g.SetLastSentID(receivedMsg.ID) + mTotalSentMessages.Add(1) + pSent.Inc() return nil } diff --git a/server/sms/sms_prometheus.go b/server/sms/sms_prometheus.go new file mode 100644 index 00000000..fef38d3d --- /dev/null +++ b/server/sms/sms_prometheus.go @@ -0,0 +1,34 @@ +package sms + +import "github.com/prometheus/client_golang/prometheus" + +var ( + pSent = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sms_sent", + Help: "Number of sms sent to the SMS service", + }) + + pNexmoSendErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sms_nexmo_send_errors", + Help: "Number of errors while trying to send sms to Nexmo", + }) + + pNexmoResponseErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sms_nexmo_response_errors", + Help: "Number of errors received from Nexmo", + }) + + pNexmoResponseInternalErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sms_nexmo_response_internal_errors", + Help: "Number of internal errors related to Nexmo responses", + }) +) + +func init() { + prometheus.MustRegister( + pSent, + pNexmoSendErrors, + pNexmoResponseErrors, + pNexmoResponseInternalErrors, + ) +} diff --git a/server/sms/utils_test.go b/server/sms/utils_test.go index 0043ce54..b8b1a543 100644 --- a/server/sms/utils_test.go +++ b/server/sms/utils_test.go @@ -3,16 +3,16 @@ package sms import ( "encoding/json" "fmt" + "io/ioutil" + "math/rand" + "testing" + "time" + "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/kvstore" "github.com/cosminrentea/gobbler/server/router" "github.com/cosminrentea/gobbler/server/store/dummystore" "github.com/stretchr/testify/assert" - "io/ioutil" - "math/rand" - "testing" - "time" ) var ( @@ -122,9 +122,8 @@ func createGateway(t *testing.T, kvStore kvstore.KVStore) *gateway { sender := createNexmoSender(t) config := createConfig() msgStore := dummystore.New(kvStore) - accessManager := auth.NewAllowAllAccessManager(true) - unstartedRouter := router.New(accessManager, msgStore, kvStore, nil) + unstartedRouter := router.New(msgStore, kvStore, nil) gw, err := New(unstartedRouter, sender, config) a.NoError(err) diff --git a/server/store/dummystore/dummy_message_store.go b/server/store/dummystore/dummy_message_store.go index 98d70aea..5bd9088d 100644 --- a/server/store/dummystore/dummy_message_store.go +++ b/server/store/dummystore/dummy_message_store.go @@ -68,7 +68,7 @@ func (dms *DummyMessageStore) StoreMessage(message *protocol.Message, nodeID uin message.ID = nextID message.Time = ts message.NodeID = nodeID - data := message.Bytes() + data := message.Encode() if err := dms.Store(partitionName, nextID, data); err != nil { return 0, err } diff --git a/server/store/filestore/message_store.go b/server/store/filestore/message_store.go index 52058874..2e67ab33 100644 --- a/server/store/filestore/message_store.go +++ b/server/store/filestore/message_store.go @@ -95,9 +95,9 @@ func (fms *FileMessageStore) StoreMessage(message *protocol.Message, nodeID uint }).Debug("Locally generated ID for message") } - data := message.Bytes() + data := message.Encode() - if err := fms.Store(partitionName, message.ID, message.Bytes()); err != nil { + if err := fms.Store(partitionName, message.ID, message.Encode()); err != nil { logger. WithError(err).WithField("partition", partitionName). Error("Error storing locally generated messagein partition") diff --git a/server/websocket/mocks_auth_gen_test.go b/server/websocket/mocks_auth_gen_test.go deleted file mode 100644 index 289ab3aa..00000000 --- a/server/websocket/mocks_auth_gen_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Automatically generated by MockGen. DO NOT EDIT! -// Source: github.com/cosminrentea/gobbler/server/auth (interfaces: AccessManager) - -package websocket - -import ( - protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" - gomock "github.com/golang/mock/gomock" -) - -// Mock of AccessManager interface -type MockAccessManager struct { - ctrl *gomock.Controller - recorder *_MockAccessManagerRecorder -} - -// Recorder for MockAccessManager (not exported) -type _MockAccessManagerRecorder struct { - mock *MockAccessManager -} - -func NewMockAccessManager(ctrl *gomock.Controller) *MockAccessManager { - mock := &MockAccessManager{ctrl: ctrl} - mock.recorder = &_MockAccessManagerRecorder{mock} - return mock -} - -func (_m *MockAccessManager) EXPECT() *_MockAccessManagerRecorder { - return _m.recorder -} - -func (_m *MockAccessManager) IsAllowed(_param0 auth.AccessType, _param1 string, _param2 protocol.Path) bool { - ret := _m.ctrl.Call(_m, "IsAllowed", _param0, _param1, _param2) - ret0, _ := ret[0].(bool) - return ret0 -} - -func (_mr *_MockAccessManagerRecorder) IsAllowed(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "IsAllowed", arg0, arg1, arg2) -} diff --git a/server/websocket/mocks_router_gen_test.go b/server/websocket/mocks_router_gen_test.go index e132bebe..d7d65383 100644 --- a/server/websocket/mocks_router_gen_test.go +++ b/server/websocket/mocks_router_gen_test.go @@ -5,7 +5,6 @@ package websocket import ( protocol "github.com/cosminrentea/gobbler/protocol" - auth "github.com/cosminrentea/gobbler/server/auth" cluster "github.com/cosminrentea/gobbler/server/cluster" kvstore "github.com/cosminrentea/gobbler/server/kvstore" router "github.com/cosminrentea/gobbler/server/router" @@ -34,17 +33,6 @@ func (_m *MockRouter) EXPECT() *_MockRouterRecorder { return _m.recorder } -func (_m *MockRouter) AccessManager() (auth.AccessManager, error) { - ret := _m.ctrl.Call(_m, "AccessManager") - ret0, _ := ret[0].(auth.AccessManager) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager") -} - func (_m *MockRouter) Cluster() *cluster.Cluster { ret := _m.ctrl.Call(_m, "Cluster") ret0, _ := ret[0].(*cluster.Cluster) diff --git a/server/websocket/receiver.go b/server/websocket/receiver.go index 573757c7..8a10016a 100644 --- a/server/websocket/receiver.go +++ b/server/websocket/receiver.go @@ -180,7 +180,7 @@ func (rec *Receiver) receiveFromSubscription() { if m.ID > rec.lastSentID { rec.lastSentID = m.ID - rec.sendC <- m.Bytes() + rec.sendC <- m.Encode() } else { logger.WithFields(log.Fields{ "msgId": m.ID, diff --git a/server/websocket/receiver_test.go b/server/websocket/receiver_test.go index 9ab032f7..559e3172 100644 --- a/server/websocket/receiver_test.go +++ b/server/websocket/receiver_test.go @@ -133,8 +133,8 @@ func Test_Receiver_Fetch_Subscribe_Fetch_Subscribe(t *testing.T) { "fetch_first2-a", "#"+protocol.SUCCESS_FETCH_END+" /foo", "#"+protocol.SUCCESS_SUBSCRIBED_TO+" /foo", - ",4,,,,1405544146,0\n\nrouter-a", - ",5,,,,1405544146,0\n\nrouter-b", + ",4,,,,,1405544146,0\n\nrouter-a", + ",5,,,,,1405544146,0\n\nrouter-b", "#"+protocol.SUCCESS_FETCH_START+" /foo 1", "fetch_after-a", "#"+protocol.SUCCESS_FETCH_END+" /foo", diff --git a/server/websocket/websocket_connector.go b/server/websocket/websocket_connector.go index 3ea45454..2f62d9b9 100644 --- a/server/websocket/websocket_connector.go +++ b/server/websocket/websocket_connector.go @@ -2,7 +2,6 @@ package websocket import ( "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/router" log "github.com/Sirupsen/logrus" @@ -27,21 +26,15 @@ var webSocketUpgrader = websocket.Upgrader{ // WSHandler is a struct used for handling websocket connections on a certain prefix. type WSHandler struct { - router router.Router - prefix string - accessManager auth.AccessManager + router router.Router + prefix string } // NewWSHandler returns a new WSHandler. func NewWSHandler(router router.Router, prefix string) (*WSHandler, error) { - accessManager, err := router.AccessManager() - if err != nil { - return nil, err - } return &WSHandler{ - router: router, - prefix: prefix, - accessManager: accessManager, + router: router, + prefix: prefix, }, nil } @@ -127,9 +120,6 @@ func (ws *WebSocket) Start() error { func (ws *WebSocket) sendLoop() { for raw := range ws.sendChannel { - if !ws.checkAccess(raw) { - continue - } if err := ws.Send(raw); err != nil { logger.WithFields(log.Fields{ "userId": ws.userID, @@ -143,21 +133,6 @@ func (ws *WebSocket) sendLoop() { } } -func (ws *WebSocket) checkAccess(raw []byte) bool { - if len(raw) > 0 && raw[0] == byte('/') { - path := getPathFromRawMessage(raw) - - logger.WithFields(log.Fields{ - "userID": ws.userID, - "path": path, - }).Debug("Received msg") - - return len(path) == 0 || ws.accessManager.IsAllowed(auth.READ, ws.userID, path) - - } - return true -} - func getPathFromRawMessage(raw []byte) protocol.Path { i := strings.Index(string(raw), ",") return protocol.Path(raw[:i]) diff --git a/server/websocket/websocket_connector_test.go b/server/websocket/websocket_connector_test.go index 59c97c7f..6ff6d72c 100644 --- a/server/websocket/websocket_connector_test.go +++ b/server/websocket/websocket_connector_test.go @@ -2,7 +2,6 @@ package websocket import ( "github.com/cosminrentea/gobbler/protocol" - "github.com/cosminrentea/gobbler/server/auth" "github.com/cosminrentea/gobbler/server/router" "github.com/cosminrentea/gobbler/server/store" "github.com/cosminrentea/gobbler/testutil" @@ -54,7 +53,7 @@ func Test_WebSocket_SubscribeAndUnsubscribe(t *testing.T) { Send([]byte("#" + protocol.SUCCESS_CANCELED + " /foo")). Do(doneGroup) - websocket := runNewWebSocket(wsconn, routerMock, messageStore, nil) + websocket := runNewWebSocket(wsconn, routerMock, messageStore) wg.Wait() a.Equal(1, len(websocket.receivers)) @@ -71,7 +70,7 @@ func Test_SendMessage(t *testing.T) { routerMock.EXPECT().HandleMessage(messageMatcher{path: "/path", message: "Hello, this is a test", header: `{"key": "value"}`}) wsconn.EXPECT().Send([]byte("#send")) - runNewWebSocket(wsconn, routerMock, messageStore, nil) + runNewWebSocket(wsconn, routerMock, messageStore) } func Test_AnIncomingMessageIsDelivered(t *testing.T) { @@ -80,44 +79,11 @@ func Test_AnIncomingMessageIsDelivered(t *testing.T) { wsconn, routerMock, messageStore := createDefaultMocks([]string{}) - wsconn.EXPECT().Send(aTestMessage.Bytes()) + wsconn.EXPECT().Send(aTestMessage.Encode()) - handler := runNewWebSocket(wsconn, routerMock, messageStore, nil) + handler := runNewWebSocket(wsconn, routerMock, messageStore) - handler.sendChannel <- aTestMessage.Bytes() - time.Sleep(time.Millisecond * 2) -} - -func Test_AnIncomingMessageIsNotAllowed(t *testing.T) { - ctrl, finish := testutil.NewMockCtrl(t) - defer finish() - - wsconn, routerMock, _ := createDefaultMocks([]string{}) - - tam := NewMockAccessManager(ctrl) - tam.EXPECT().IsAllowed(auth.READ, "testuser", protocol.Path("/foo")).Return(false) - handler := NewWebSocket( - testWSHandler(routerMock, tam), - wsconn, - "testuser", - ) - go func() { - handler.Start() - }() - time.Sleep(time.Millisecond * 2) - - handler.sendChannel <- aTestMessage.Bytes() - time.Sleep(time.Millisecond * 2) - //nothing shall have been sent - - //now allow - tam.EXPECT().IsAllowed(auth.READ, "testuser", protocol.Path("/foo")).Return(true) - - wsconn.EXPECT().Send(aTestMessage.Bytes()) - - time.Sleep(time.Millisecond * 2) - - handler.sendChannel <- aTestMessage.Bytes() + handler.sendChannel <- aTestMessage.Encode() time.Sleep(time.Millisecond * 2) } @@ -147,7 +113,7 @@ func Test_BadCommands(t *testing.T) { return nil }).AnyTimes() - runNewWebSocket(wsconn, routerMock, messageStore, nil) + runNewWebSocket(wsconn, routerMock, messageStore) wg.Wait() assert.Equal(t, len(badRequests), counter, "expected number of bad requests does not match") @@ -160,27 +126,21 @@ func TestExtractUserId(t *testing.T) { } func testWSHandler( - routerMock *MockRouter, - accessManager auth.AccessManager) *WSHandler { + routerMock *MockRouter) *WSHandler { return &WSHandler{ - router: routerMock, - prefix: "/prefix", - accessManager: accessManager, + router: routerMock, + prefix: "/prefix", } } func runNewWebSocket( wsconn *MockWSConnection, routerMock *MockRouter, - messageStore store.MessageStore, - accessManager auth.AccessManager) *WebSocket { + messageStore store.MessageStore) *WebSocket { - if accessManager == nil { - accessManager = auth.NewAllowAllAccessManager(true) - } websocket := NewWebSocket( - testWSHandler(routerMock, accessManager), + testWSHandler(routerMock), wsconn, "testuser", )