Skip to content

Commit

Permalink
feat(push): handle envelopes from node
Browse files Browse the repository at this point in the history
fix #912

Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Jan 18, 2019
1 parent ff0d0ad commit 33cb5c1
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 130 deletions.
6 changes: 4 additions & 2 deletions client/react-native/gomobile/core/notification.go
@@ -1,11 +1,11 @@
package core

import (
"berty.tech/core/push"
"fmt"
"sync"

"berty.tech/core/pkg/notification"
"berty.tech/core/push"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -65,7 +65,9 @@ func (n *MobileNotification) ReceiveToken(token *notification.Token) {
)
n.tokenSubscribersMutex.Lock()
for i := range n.subscribers {
n.tokenSubscribers[i] <- token
// make soft copy
t := *token
n.tokenSubscribers[i] <- &t
}
n.tokenSubscribersMutex.Unlock()

Expand Down
5 changes: 3 additions & 2 deletions core/manager/account/account.go
@@ -1,7 +1,6 @@
package account

import (
"berty.tech/core/push"
"context"
"fmt"
"io"
Expand All @@ -27,6 +26,7 @@ import (
"berty.tech/core/pkg/notification"
"berty.tech/core/pkg/tracing"
"berty.tech/core/pkg/zapring"
"berty.tech/core/push"
"berty.tech/core/sql"
"berty.tech/core/sql/sqlcipher"
"github.com/jinzhu/gorm"
Expand Down Expand Up @@ -490,13 +490,14 @@ func (a *Account) initNode(ctx context.Context) error {
node.WithDevice(&entity.Device{Name: a.Name}),
node.WithNetworkDriver(a.network),
node.WithNetworkMetrics(a.metrics),
node.WithNotificationDriver(a.notification),
node.WithInitConfig(),
node.WithSoftwareCrypto(), // FIXME: use hardware impl if available
node.WithConfig(),
node.WithRing(a.ring),
node.WithNotificationDriver(a.notification),
node.WithPushManager(a.pushManager),
node.WithPushTokenSubscriber(),
node.WithPushNotificationSubscriber(),
)
if err != nil {
return errorcodes.ErrAccManagerInitNode.Wrap(err)
Expand Down
122 changes: 93 additions & 29 deletions core/node/push.go
@@ -1,11 +1,17 @@
package node

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"

"berty.tech/core/api/p2p"
"berty.tech/core/entity"
"berty.tech/core/pkg/deviceinfo"
"berty.tech/core/pkg/errorcodes"
"berty.tech/core/push"
"bytes"
"context"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -25,42 +31,100 @@ func WithPushTokenSubscriber() NewNodeOption {
go func() {
tokenSubscription := n.notificationDriver.SubscribeToken()

for token := range tokenSubscription {
currentToken := &entity.DevicePushConfig{}
for {
select {
case token := <-tokenSubscription:
{
currentToken := &entity.DevicePushConfig{}

if err = n.sql(ctx).First(&currentToken, &entity.DevicePushConfig{PushType: token.Type}).Error; err != nil {
logger().Error("unable to get push token", zap.Error(err))
}
if err = n.sql(ctx).First(&currentToken, &entity.DevicePushConfig{PushType: token.Type}).Error; err != nil {
logger().Info("unable to get push token", zap.Error(err))
}

pushID := &push.PushNativeIdentifier{
PackageID: packageID,
DeviceToken: token.Hash(),
}
pushID := &push.PushNativeIdentifier{
PackageID: packageID,
DeviceToken: token.Hash(),
}

pushIDBytes, err := pushID.Marshal()
if err != nil {
logger().Error("unable to serialize push id", zap.Error(err))
continue
}
pushIDBytes, err := pushID.Marshal()
if err != nil {
logger().Error("unable to serialize push id", zap.Error(err))
continue
}

if len(token.Value) > 0 && bytes.Compare(currentToken.PushID, pushIDBytes) == 0 {
continue
}
if len(token.Value) > 0 && bytes.Compare(currentToken.PushID, pushIDBytes) == 0 {
continue
}

if len(currentToken.PushID) > 0 {
_, err = n.DevicePushConfigRemove(ctx, currentToken)
if len(currentToken.PushID) > 0 {
_, err = n.DevicePushConfigRemove(ctx, currentToken)

logger().Error("unable to delete existing push token", zap.Error(err))
logger().Error("unable to delete existing push token", zap.Error(err))
}

if len(token.Value) > 0 {
_, err = n.DevicePushConfigCreate(ctx, &entity.DevicePushConfig{
RelayID: []byte{},
PushID: pushIDBytes,
PushType: token.Type,
})

logger().Error("unable to create push token", zap.Error(err))
}
}
case <-n.shutdown:
logger().Debug("node push token subscriber shutdown")
n.notificationDriver.UnsubscribeToken(tokenSubscription)
}
}
}()
}
}

func WithPushNotificationSubscriber() NewNodeOption {
return func(n *Node) {
ctx := context.Background()
go func() {
notificationSubscription := n.notificationDriver.Subscribe()

for {
select {
case notification := <-notificationSubscription:
{
logger().Debug("node receive push notification")

payload := push.Payload{}
if err := json.Unmarshal([]byte(notification.Body), &payload); err != nil {
logger().Error(errorcodes.ErrNodePushNotifSub.Wrap(err).Error())
continue
}

b64Envelope := payload.BertyEnvelope
if b64Envelope == "" {
logger().Error(errorcodes.ErrNodePushNotifSub.Wrap(errors.New("berty-envelope is missing")).Error())
continue
}

bytesEnvelope, err := base64.StdEncoding.DecodeString(string(b64Envelope))
if err != nil {
logger().Error(errorcodes.ErrNodePushNotifSub.Wrap(err).Error())
continue
}

if len(token.Value) > 0 {
_, err = n.DevicePushConfigCreate(ctx, &entity.DevicePushConfig{
RelayID: []byte{},
PushID: pushIDBytes,
PushType: token.Type,
})
envelope := &p2p.Envelope{}
if err := envelope.Unmarshal(bytesEnvelope); err != nil {
logger().Error(errorcodes.ErrNodePushNotifSub.Wrap(err).Error())
continue
}

logger().Error("unable to create push token", zap.Error(err))
if err := n.handleEnvelope(ctx, envelope); err != nil {
logger().Error(errorcodes.ErrNodePushNotifSub.Wrap(err).Error())
continue
}
}
case <-n.shutdown:
logger().Debug("node push notification subscriber shutdown")
n.notificationDriver.Unsubscribe(notificationSubscription)
}
}
}()
Expand Down

0 comments on commit 33cb5c1

Please sign in to comment.