Skip to content

Commit

Permalink
Merge 29ee978 into e8d5916
Browse files Browse the repository at this point in the history
  • Loading branch information
marian-craciunescu authored Jun 21, 2017
2 parents e8d5916 + 29ee978 commit 1482dd2
Show file tree
Hide file tree
Showing 13 changed files with 616 additions and 108 deletions.
42 changes: 37 additions & 5 deletions server/apns/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package apns

import (
"fmt"
"time"

log "github.com/Sirupsen/logrus"
"github.com/cosminrentea/expvarmetrics"
"github.com/cosminrentea/gobbler/server/connector"
"github.com/cosminrentea/gobbler/server/kafka"
"github.com/cosminrentea/gobbler/server/router"
"github.com/sideshow/apns2"
"time"
)

const (
Expand All @@ -33,10 +34,11 @@ type Config struct {
type apns struct {
Config
connector.Connector
apnsKafkaReportingTopic string
}

// New creates a new connector.ResponsiveConnector without starting it
func New(router router.Router, sender connector.Sender, config Config, kafkaProducer kafka.Producer, kafkaReportingTopic string) (connector.ResponsiveConnector, error) {
func New(router router.Router, sender connector.Sender, config Config, kafkaProducer kafka.Producer, subUnsubKafkaReportingTopic, apnsKafkaReportingTopic string) (connector.ResponsiveConnector, error) {
baseConn, err := connector.NewConnector(
router,
sender,
Expand All @@ -48,15 +50,16 @@ func New(router router.Router, sender connector.Sender, config Config, kafkaProd
Workers: *config.Workers,
},
kafkaProducer,
kafkaReportingTopic,
subUnsubKafkaReportingTopic,
)
if err != nil {
logger.WithError(err).Error("Base connector error")
return nil, err
}
a := &apns{
Config: config,
Connector: baseConn,
Config: config,
Connector: baseConn,
apnsKafkaReportingTopic: apnsKafkaReportingTopic,
}
a.SetResponseHandler(a)
return a, nil
Expand Down Expand Up @@ -97,7 +100,18 @@ func (a *apns) startIntervalMetric(m metrics.Map, td time.Duration) {
}

func (a *apns) HandleResponse(request connector.Request, responseIface interface{}, metadata *connector.Metadata, errSend error) error {

l := logger.WithField("correlation_id", request.Message().CorrelationID())

event := ApnsEvent{
Type: "pn_reporting_apns",
Payload: ApnsEventPayload{},
}
errFill := event.fillApnsEvent(request)
if errFill != nil {
logger.WithError(errFill).Error("Error filling event")
}

l.Info("Handle APNS response")
if errSend != nil {
l.WithFields(log.Fields{
Expand Down Expand Up @@ -126,17 +140,35 @@ func (a *apns) HandleResponse(request connector.Request, responseIface interface
pResponseInternalErrors.Inc()
return err
}

event.Payload.ApnsID = r.ApnsID
if r.Sent() {
l.WithField("id", r.ApnsID).Info("APNS notification was successfully sent")
mTotalSentMessages.Add(1)
pSentMessages.Inc()
if *a.IntervalMetrics && metadata != nil {
addToLatenciesAndCountsMaps(currentTotalMessagesLatenciesKey, currentTotalMessagesKey, metadata.Latency)
}

event.Payload.Status = "Success"
event.Payload.ErrorText = ""
err := event.report(a.KafkaProducer(), a.apnsKafkaReportingTopic)
if err !=nil && err != errApnsKafkaReportingConfiguration {
logger.WithError(err).Error("Reporting APNS to kafka failed")
}

return nil
}
l.Error("APNS notification was not sent")
l.WithField("id", r.ApnsID).WithField("reason", r.Reason).Info("APNS notification was not sent - details")

event.Payload.Status = "Fail"
event.Payload.ErrorText = r.Reason
err := event.report(a.KafkaProducer(), a.apnsKafkaReportingTopic)
if err !=nil && err != errApnsKafkaReportingConfiguration {
logger.WithError(err).Error("Reporting APNS to kafka failed")
}

switch r.Reason {
case
apns2.ReasonMissingDeviceToken,
Expand Down
113 changes: 113 additions & 0 deletions server/apns/apns_reporting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package apns

import (
"encoding/json"
"errors"
"time"

"fmt"

"github.com/cosminrentea/go-uuid"
"github.com/cosminrentea/gobbler/server/connector"
"github.com/cosminrentea/gobbler/server/kafka"
)

type ApnsEventPayload struct {
Topic string `json:"topic"`
Status string `json:"status"`
ErrorText string `json:"error_text"`
ApnsID string `json:"apns_id"`
CorrelationID string `json:"correlation_id"`
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
NotificationBody string `json:"notification_body"`
NotificationTitle string `json:"notification_title"`
DeepLink string `json:"deep_link"`
}

type ApnsEvent struct {
Id string `json:"id"`
Time string `json:"time"`
Type string `json:"type"`
Payload ApnsEventPayload `json:"payload"`
}

var (
errApnsKafkaReportingConfiguration = errors.New("Kafka Reporting for APNS is not correctly configured")
errAlertDecodingFailed = errors.New("Decoding of aps2.alert field failed")
)

func (ev *ApnsEvent) fillApnsEvent(request connector.Request) error {

ev.Payload.CorrelationID = request.Message().CorrelationID()

deviceID := request.Subscriber().Route().Get(deviceIDKey)
ev.Payload.DeviceID = deviceID

userID := request.Subscriber().Route().Get(userIDKey)
ev.Payload.UserID = userID

var payload Payload

err := json.Unmarshal(request.Message().Body, &payload)
if err != nil {
logger.WithError(err).Error("Error reading apns notification built.")
return err
}

ev.Payload.DeepLink = payload.Deeplink

alert := payload.Aps.Alert
alertBody, ok := alert.(map[string]interface{})
if !ok {
return errAlertDecodingFailed
}
ev.Payload.NotificationBody = fmt.Sprintf("%s", alertBody["body"])
ev.Payload.NotificationTitle = fmt.Sprintf("%s", alertBody["title"])
ev.Payload.Topic = payload.Topic

return nil
}

func (event *ApnsEvent) report(kafkaProducer kafka.Producer, kafkaReportingTopic string) error {
if kafkaProducer == nil || kafkaReportingTopic == "" {
return errApnsKafkaReportingConfiguration
}
uuid, err := go_uuid.New()
if err != nil {
logger.WithError(err).Error("Could not get new UUID")
return err
}
responseTime := time.Now().UTC().Format(time.RFC3339)
event.Id = uuid
event.Time = responseTime

bytesReportEvent, err := json.Marshal(event)
if err != nil {
logger.WithError(err).Error("Error while marshaling Kafka reporting event to JSON format")
return err
}
logger.WithField("event", *event).Debug("Reporting sent APNS event to Kafka topic")
kafkaProducer.Report(kafkaReportingTopic, bytesReportEvent, uuid)
return nil
}

/*
This is a copy of the apns2 format in order to decapsulate the data from the gobbler message.
*/
type Payload struct {
Deeplink string `json:"deeplink"`
Topic string `json:"topic"`
Aps Aps `json:"aps"`
}

type Aps struct {
Alert interface{} `json:"alert,omitempty"`
Badge interface{} `json:"badge,omitempty"`
Category string `json:"category,omitempty"`
ContentAvailable int `json:"content-available,omitempty"`
MutableContent int `json:"mutable-content,omitempty"`
Sound string `json:"sound,omitempty"`
ThreadID string `json:"thread-id,omitempty"`
URLArgs []string `json:"url-args,omitempty"`
}
13 changes: 12 additions & 1 deletion server/apns/apns_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,28 @@ func TestSender_Send(t *testing.T) {
defer finish()
a := assert.New(t)

defer testutil.EnableDebugForMethod() ()
// given
routeParams := make(map[string]string)
routeParams["device_id"] = "1234"

routeConfig := router.RouteConfig{
Path: protocol.Path("path"),
RouteParams: routeParams,
}
route := router.NewRoute(routeConfig)

msg := &protocol.Message{
Body: []byte("{}"),
HeaderJSON: `{"Correlation-Id": "7sdks723ksgqn"}`,
Body: []byte(`{
"aps":{
"alert":{"body":"Die größte Sonderangebot!","title":"Valid Title"},
"badge":0,
"content-available":1
},
"topic":"marketing_notifications",
"deeplink":"rewe://angebote"
}`),
}

mSubscriber := NewMockSubscriber(testutil.MockCtrl)
Expand Down
Loading

0 comments on commit 1482dd2

Please sign in to comment.