Skip to content

Commit

Permalink
fix: re-enqueue message when we fail to decrypt it
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Mar 31, 2022
1 parent 24e19e1 commit 9bc63f3
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go/pkg/bertyprotocol/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func FillMessageKeysHolderUsingNewData(ctx context.Context, gc *GroupContext) <-

// A new chainKey is registered, check if cached messages can be opened with it
if rawPK, err := pk.Raw(); err == nil {
gc.MessageStore().ProcessMessageQueueForDevicePK(rawPK)
gc.MessageStore().ProcessMessageQueueForDevicePK(ctx, rawPK)
}

ch <- pk
Expand Down
23 changes: 17 additions & 6 deletions go/pkg/bertyprotocol/store_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,16 @@ func (m *MessageStore) CacheSizeForDevicePK(devicePK []byte) (size int, ok bool)
return
}

func (m *MessageStore) ProcessMessageQueueForDevicePK(devicePK []byte) {
func (m *MessageStore) ProcessMessageQueueForDevicePK(ctx context.Context, devicePK []byte) {
m.muDeviceCaches.Lock()
device, ok := m.deviceCaches[string(devicePK)]
if ok {
device.hasSecret = true
if next := device.queue.Next(); next != nil {
m.cmessage <- next
if device.hasSecret = m.mks.HasSecretForRawDevicePK(ctx, m.g.PublicKey, devicePK); device.hasSecret {
if next := device.queue.Next(); next != nil {
m.cmessage <- next
}
} else {
m.logger.Error("unable to process message, no secret found for device pk", logutil.PrivateBinary("devicepk", devicePK))
}
}
m.muDeviceCaches.Unlock()
Expand Down Expand Up @@ -209,8 +212,16 @@ func (m *MessageStore) processMessageLoop(ctx context.Context) {
}

if err := m.processMessage(ctx, ownPK, message); err != nil {
m.logger.Error("unable to process message", zap.Error(err))
return
if errcode.Is(err, errcode.ErrCryptoDecryptPayload) {
// @FIXME(gfanton): this should not happen
m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err))

// if failed to decrypt add to queue, for later process
device.queue.Add(message)
_ = m.emitters.groupCacheMessage.Emit(*message)
} else {
m.logger.Error("unable to prcess message", zap.Error(err))
}
} else if next := device.queue.Next(); next != nil {
m.cmessage <- next
}
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/bertyprotocol/store_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func Test_Add_Messages_To_Cache(t *testing.T) {
new(protocoltypes.GroupMessageEvent), eventbus.BufSize(entriesCount))
require.NoError(t, err)

peers[1].GC.MessageStore().ProcessMessageQueueForDevicePK(dPK0Raw)
peers[1].GC.MessageStore().ProcessMessageQueueForDevicePK(ctx, dPK0Raw)

// check that all events has been received on peer 2
for i := 0; i < entriesCount; i++ {
Expand Down
2 changes: 2 additions & 0 deletions go/pkg/bertyreplication/services_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ func TestReplicationService_ReplicateGroupStats_ReplicateGlobalStats(t *testing.
}

func TestReplicationService_Flow(t *testing.T) {
testutil.FilterSpeed(t, testutil.Slow)

logger, cleanup := testutil.Logger(t)
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 9bc63f3

Please sign in to comment.