Skip to content

Commit

Permalink
Merge branch 'master' into feature/pn-310-sms-ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
cosminrentea committed Mar 22, 2017
2 parents ab14544 + 9ec3aae commit bfc87cf
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 115 deletions.
15 changes: 15 additions & 0 deletions protocol/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,18 @@ func ParseMessage(message []byte) (*Message, error) {

return m, nil
}

func (m *Message) CorrelationID() string {
values := make(map[string]string)
err := json.Unmarshal([]byte(m.HeaderJSON), &values)
if err != nil {
log.WithField("error", err.Error()).Error("Correlation id decoding failed.")
return ""
}
correlationID, ok := values["Correlation-Id"]
if !ok {
log.Error("Correlation id not set")
return ""
}
return correlationID
}
102 changes: 73 additions & 29 deletions protocol/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ Hello World`
)

func TestParsingANormalMessage(t *testing.T) {
assert := assert.New(t)
a := assert.New(t)

msgI, err := Decode([]byte(aNormalMessage))
assert.NoError(err)
assert.IsType(&Message{}, msgI)
a.NoError(err)
a.IsType(&Message{}, msgI)
msg := msgI.(*Message)

assert.Equal(uint64(42), msg.ID)
assert.Equal(Path("/foo/bar"), msg.Path)
assert.Equal("user01", msg.UserID)
assert.Equal("phone01", msg.ApplicationID)
assert.Equal(map[string]string{"user": "user01"}, msg.Filters)
assert.Equal(unixTime.Unix(), msg.Time)
assert.Equal(uint8(1), msg.NodeID)
assert.Equal(`{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}`, msg.HeaderJSON)
assert.Equal("Hello World", string(msg.Body))
a.Equal(uint64(42), msg.ID)
a.Equal(Path("/foo/bar"), msg.Path)
a.Equal("user01", msg.UserID)
a.Equal("phone01", msg.ApplicationID)
a.Equal(map[string]string{"user": "user01"}, msg.Filters)
a.Equal(unixTime.Unix(), msg.Time)
a.Equal(uint8(1), msg.NodeID)
a.Equal(`{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}`, msg.HeaderJSON)
a.Equal("Hello World", string(msg.Body))
}

func TestSerializeANormalMessage(t *testing.T) {
Expand All @@ -68,6 +68,50 @@ func TestSerializeANormalMessage(t *testing.T) {
assert.Equal(t, strings.SplitN(aNormalMessageNoExpires, "\n", 2)[0], msg.Metadata())
}

func TestCorrelationID(t *testing.T) {
a := assert.New(t)
msg := &Message{
ID: uint64(42),
Path: Path("/foo/bar"),
UserID: "user01",
ApplicationID: "phone01",
Filters: map[string]string{"user": "user01"},
Time: unixTime.Unix(),
NodeID: 1,
HeaderJSON: `{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}`,
Body: []byte("Hello World"),
}
a.Equal("7sdks723ksgqn", msg.CorrelationID())
}

func TestCorrelationIDError(t *testing.T) {
a := assert.New(t)
msg := &Message{
ID: uint64(42),
Path: Path("/foo/bar"),
UserID: "user01",
ApplicationID: "phone01",
Filters: map[string]string{"user": "user01"},
Time: unixTime.Unix(),
NodeID: 1,
Body: []byte("Hello World"),
}
a.Equal("", msg.CorrelationID())

msg = &Message{
ID: uint64(42),
Path: Path("/foo/bar"),
UserID: "user01",
ApplicationID: "phone01",
Filters: map[string]string{"user": "user01"},
Time: unixTime.Unix(),
NodeID: 1,
HeaderJSON: `{"Content-Type": "text/plain"}`,
Body: []byte("Hello World"),
}
a.Equal("", msg.CorrelationID())
}

func TestSerializeANormalMessageWithExpires(t *testing.T) {
// given: a message
msg := &Message{
Expand Down Expand Up @@ -116,46 +160,46 @@ func TestSerializeAMinimalMessageWithBody(t *testing.T) {
}

func TestParsingAMinimalMessage(t *testing.T) {
assert := assert.New(t)
a := assert.New(t)

msgI, err := Decode([]byte(aMinimalMessage))
assert.NoError(err)
assert.IsType(&Message{}, msgI)
a.NoError(err)
a.IsType(&Message{}, msgI)
msg := msgI.(*Message)

assert.Equal(uint64(42), msg.ID)
assert.Equal(Path("/"), msg.Path)
assert.Equal("", msg.UserID)
assert.Equal("", msg.ApplicationID)
assert.Nil(msg.Filters)
assert.Equal(unixTime.Unix(), msg.Time)
assert.Equal("", msg.HeaderJSON)
a.Equal(uint64(42), msg.ID)
a.Equal(Path("/"), msg.Path)
a.Equal("", msg.UserID)
a.Equal("", msg.ApplicationID)
a.Nil(msg.Filters)
a.Equal(unixTime.Unix(), msg.Time)
a.Equal("", msg.HeaderJSON)

assert.Equal("", string(msg.Body))
a.Equal("", string(msg.Body))
}

func TestErrorsOnParsingMessages(t *testing.T) {
assert := assert.New(t)
a := assert.New(t)

var err error
_, err = Decode([]byte(""))
assert.Error(err)
a.Error(err)

// missing meta field
_, err = Decode([]byte("42,/foo/bar,user01,phone1,id123\n{}\nBla"))
assert.Error(err)
a.Error(err)

// id not an integer
_, err = Decode([]byte("xy42,/foo/bar,user01,phone1,id123,1420110000\n"))
assert.Error(err)
a.Error(err)

// path is empty
_, err = Decode([]byte("42,,user01,phone1,id123,1420110000\n"))
assert.Error(err)
a.Error(err)

// Error Message without Name
_, err = Decode([]byte("!"))
assert.Error(err)
a.Error(err)
}

func Test_Message_getPartitionFromTopic(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions restclient/guble_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"net/url"

log "github.com/Sirupsen/logrus"
"github.com/cosminrentea/gobbler/server/rest"
)

const (
correlationIDLiteral = "correlationID"
)

type gubleSender struct {
Expand Down Expand Up @@ -92,6 +97,7 @@ func (gs gubleSender) Send(topic string, body []byte, userID string, params map[
if err != nil {
return err
}
request.Header.Add(rest.XHeaderPrefix+"correlation-id", params[correlationIDLiteral])
response, err := gs.httpClient.Do(request)
if err != nil {
return err
Expand Down
26 changes: 13 additions & 13 deletions server/apns/apns.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package apns

import (
"errors"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/cosminrentea/gobbler/server/connector"
"github.com/cosminrentea/gobbler/server/metrics"
"github.com/cosminrentea/gobbler/server/router"
Expand All @@ -15,10 +15,6 @@ const (
schema = "apns_registration"
)

var (
errSenderNotRecreated = errors.New("APNS Sender could not be recreated.")
)

// Config is used for configuring the APNS module.
type Config struct {
Enabled *bool
Expand Down Expand Up @@ -94,9 +90,13 @@ func (a *apns) startIntervalMetric(m metrics.Map, td time.Duration) {
}

func (a *apns) HandleResponse(request connector.Request, responseIface interface{}, metadata *connector.Metadata, errSend error) error {
logger.Info("Handle APNS response")
l := logger.WithField("correlation_id", request.Message().CorrelationID())
l.Info("Handle APNS response")
if errSend != nil {
logger.WithField("error", errSend.Error()).WithField("error_type", errSend).Error("error when trying to send APNS notification")
l.WithFields(log.Fields{
"error": errSend.Error(),
"error_type": errSend,
}).Error("error when trying to send APNS notification")
mTotalSendErrors.Add(1)
pSendErrors.Inc()
if *a.IntervalMetrics && metadata != nil {
Expand All @@ -114,22 +114,22 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface
subscriber := request.Subscriber()
subscriber.SetLastID(messageID)
if err := a.Manager().Update(subscriber); err != nil {
logger.WithField("error", err.Error()).Error("Manager could not update subscription")
l.WithField("error", err.Error()).Error("Manager could not update subscription")
mTotalResponseInternalErrors.Add(1)
pResponseInternalErrors.Inc()
return err
}
if r.Sent() {
logger.WithField("id", r.ApnsID).Info("APNS notification was successfully sent")
l.WithField("id", r.ApnsID).Info("APNS notification was successfully sent")
mTotalSentMessages.Add(1)
pSentMessages.Inc()
if *a.IntervalMetrics && metadata != nil {
addToLatenciesAndCountsMaps(currentTotalMessagesLatenciesKey, currentTotalMessagesKey, metadata.Latency)
}
return nil
}
logger.Error("APNS notification was not sent")
logger.WithField("id", r.ApnsID).WithField("reason", r.Reason).Info("APNS notification was not sent - details")
l.Error("APNS notification was not sent")
l.WithField("id", r.ApnsID).WithField("reason", r.Reason).Info("APNS notification was not sent - details")
switch r.Reason {
case
apns2.ReasonMissingDeviceToken,
Expand All @@ -142,10 +142,10 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface
pResponseRegistrationErrors.Inc()
err := a.Manager().Remove(subscriber)
if err != nil {
logger.WithField("id", r.ApnsID).Error("could not remove subscriber")
l.WithField("id", r.ApnsID).Error("could not remove subscriber")
}
default:
logger.Error("handling other APNS errors")
l.Error("handling other APNS errors")
mTotalResponseOtherErrors.Add(1)
pResponseOtherErrors.Inc()
}
Expand Down
7 changes: 4 additions & 3 deletions server/apns/apns_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func NewSenderUsingPusher(pusher Pusher, appTopic string) (connector.Sender, err
}

func (s sender) Send(request connector.Request) (interface{}, error) {
l := logger.WithField("correlation_id", request.Message().CorrelationID())
deviceToken := request.Subscriber().Route().Get(deviceIDKey)
logger.WithField("deviceToken", deviceToken).Info("Trying to push a message to APNS")
l.WithField("deviceToken", deviceToken).Info("Trying to push a message to APNS")
push := func() (interface{}, error) {
return s.client.Push(&apns2.Notification{
Priority: apns2.PriorityHigh,
Expand All @@ -67,15 +68,15 @@ func (s sender) Send(request connector.Request) (interface{}, error) {
result, err := withRetry.execute(push)
if err != nil && err == ErrRetryFailed {
if closable, ok := s.client.(closable); ok {
logger.Warn("Close TLS and retry again")
l.Warn("Close TLS and retry again")
mTotalSendRetryCloseTLS.Add(1)
pSendRetryCloseTLS.Inc()
closable.CloseTLS()
return push()
} else {
mTotalSendRetryUnrecoverable.Add(1)
pSendRetryUnrecoverable.Inc()
logger.Error("Cannot Close TLS. Unrecoverable state")
l.Error("Cannot Close TLS. Unrecoverable state")
}
}
return result, err
Expand Down
13 changes: 11 additions & 2 deletions server/apns/apns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sideshow/apns2"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

var ErrSendRandomError = errors.New("A Sender error")
Expand Down Expand Up @@ -46,7 +47,13 @@ func TestConn_HandleResponseOnSendError(t *testing.T) {
//given
c, _ := newAPNSConnector(t)
mRequest := NewMockRequest(testutil.MockCtrl)
message := &protocol.Message{
HeaderJSON: `{"Correlation-Id": "7sdks723ksgqn"}`,
ID: 42,
}
mRequest.EXPECT().Message().Return(message)

time.Sleep(100 * time.Millisecond)
//when
err := c.HandleResponse(mRequest, nil, nil, ErrSendRandomError)

Expand All @@ -71,7 +78,8 @@ func TestConn_HandleResponse(t *testing.T) {
c.Manager().Add(mSubscriber)

message := &protocol.Message{
ID: 42,
ID: 42,
HeaderJSON: `{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}`,
}
mRequest := NewMockRequest(testutil.MockCtrl)
mRequest.EXPECT().Message().Return(message).AnyTimes()
Expand Down Expand Up @@ -105,7 +113,8 @@ func TestNew_HandleResponseHandleSubscriber(t *testing.T) {
}
for _, reason := range removeForReasons {
message := &protocol.Message{
ID: 42,
ID: 42,
HeaderJSON: `{"Correlation-Id": "7sdks723ksgqn"}`,
}
mSubscriber := NewMockSubscriber(testutil.MockCtrl)
mSubscriber.EXPECT().SetLastID(gomock.Any())
Expand Down
17 changes: 10 additions & 7 deletions server/fcm/fcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (f *fcm) startIntervalMetric(m metrics.Map, td time.Duration) {
}

func (f *fcm) HandleResponse(request connector.Request, responseIface interface{}, metadata *connector.Metadata, err error) error {
l := logger.WithField("correlation_id", request.Message().CorrelationID())

if err != nil && !isValidResponseError(err) {
logger.WithField("error", err.Error()).Error("Error sending message to FCM")
l.WithField("error", err.Error()).Error("Error sending message to FCM")
mTotalSendErrors.Add(1)
pSendErrors.Inc()
if *f.IntervalMetrics && metadata != nil {
Expand All @@ -103,10 +105,11 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{
return fmt.Errorf("Invalid FCM Response")
}

logger.WithField("messageID", message.ID).Debug("Delivered message to FCM")
l.WithField("messageID", message.ID).Debug("Delivered message to FCM")

subscriber.SetLastID(message.ID)
if err := f.Manager().Update(request.Subscriber()); err != nil {
logger.WithField("error", err.Error()).Error("Manager could not update subscription")
l.WithField("error", err.Error()).Error("Manager could not update subscription")
mTotalResponseInternalErrors.Add(1)
return err
}
Expand All @@ -119,19 +122,19 @@ func (f *fcm) HandleResponse(request connector.Request, responseIface interface{
return nil
}

logger.WithField("success", response.Success).Debug("Handling FCM Error")
l.WithField("success", response.Success).Debug("Handling FCM Error")

switch errText := response.Error.Error(); errText {
case "NotRegistered":
logger.Debug("Removing not registered FCM subscription")
l.Debug("Removing not registered FCM subscription")
f.Manager().Remove(subscriber)
mTotalResponseNotRegisteredErrors.Add(1)
pResponseNotRegisteredErrors.Inc()
return response.Error
case "InvalidRegistration":
logger.WithField("jsonError", errText).Error("InvalidRegistration of FCM subscription")
l.WithField("jsonError", errText).Error("InvalidRegistration of FCM subscription")
default:
logger.WithField("jsonError", errText).Error("Unexpected error while sending to FCM")
l.WithField("jsonError", errText).Error("Unexpected error while sending to FCM")
}

if response.CanonicalIDs != 0 {
Expand Down
Loading

0 comments on commit bfc87cf

Please sign in to comment.