Skip to content

Commit

Permalink
fix(messenger): also subscribe to Contact_Established contact groups …
Browse files Browse the repository at this point in the history
…on service init

Signed-off-by: Norman Meier <norman@berty.tech>
  • Loading branch information
n0izn0iz committed Aug 26, 2020
1 parent c8a9b28 commit 92de769
Showing 1 changed file with 40 additions and 28 deletions.
68 changes: 40 additions & 28 deletions go/pkg/bertymessenger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,15 @@ func New(client bertyprotocol.ProtocolServiceClient, opts *Opts) (Service, error
return nil, err
}

// subscribe to groups
// subscribe to multimember groups
{
var convs []Conversation
err := svc.db.Find(&convs).Error
if err != nil {
return nil, err
}
for _, cv := range convs {
gpk := cv.GetPublicKey()
gpkb, err := stringToBytes(gpk)
gpkb, err := stringToBytes(cv.GetPublicKey())
if err != nil {
return nil, err
}
Expand All @@ -97,52 +96,38 @@ func New(client bertyprotocol.ProtocolServiceClient, opts *Opts) (Service, error
return nil, err
}

ms, err := svc.protocolClient.GroupMessageSubscribe(svc.ctx, &bertytypes.GroupMessageSubscribe_Request{GroupPK: gpkb})
if err != nil {
if err := svc.subscribeToMessages(ctx, gpkb); err != nil {
return nil, err
}
go func() {
for {
gme, err := ms.Recv()
if err != nil {
svc.logStreamingError("group message", err)
return
}

var am AppMessage
if err := proto.Unmarshal(gme.GetMessage(), &am); err != nil {
svc.logger.Warn("failed to unmarshal AppMessage", zap.Error(err))
return
}
err = handleAppMessage(&svc, gpk, gme, &am)
if err != nil {
svc.logger.Error("failed to handle app message", zap.Error(errcode.ErrInternal.Wrap(err)))
}
}
}()
}
}

// subscribe to group metadata for contacts in outgoing request sent state
// subscribe to contact groups
{
var contacts []Contact
err := svc.db.Find(&contacts).Error
if err != nil {
return nil, err
}
for _, c := range contacts {
if c.State != Contact_OutgoingRequestSent {
if c.State != Contact_OutgoingRequestSent && c.State != Contact_Established {
continue
}
gpk := c.GetConversationPublicKey()
gpkb, err := stringToBytes(gpk)

gpkb, err := stringToBytes(c.GetConversationPublicKey())
if err != nil {
return nil, err
}

if err := svc.subscribeToMetadata(ctx, gpkb); err != nil {
return nil, err
}

if c.State == Contact_Established {
if err := svc.subscribeToMessages(ctx, gpkb); err != nil {
return nil, err
}
}
}
}

Expand Down Expand Up @@ -170,6 +155,33 @@ func (svc *service) subscribeToMetadata(ctx context.Context, gpkb []byte) error
return nil
}

func (svc *service) subscribeToMessages(ctx context.Context, gpkb []byte) error {
ms, err := svc.protocolClient.GroupMessageSubscribe(svc.ctx, &bertytypes.GroupMessageSubscribe_Request{GroupPK: gpkb})
if err != nil {
return err
}
go func() {
for {
gme, err := ms.Recv()
if err != nil {
svc.logStreamingError("group message", err)
return
}

var am AppMessage
if err := proto.Unmarshal(gme.GetMessage(), &am); err != nil {
svc.logger.Warn("failed to unmarshal AppMessage", zap.Error(err))
return
}
err = handleAppMessage(svc, bytesToString(gpkb), gme, &am)
if err != nil {
svc.logger.Error("failed to handle app message", zap.Error(errcode.ErrInternal.Wrap(err)))
}
}
}()
return nil
}

func (svc *service) Close() {
svc.logger.Info("closing service")
svc.cancelFn()
Expand Down

0 comments on commit 92de769

Please sign in to comment.