Skip to content

Commit

Permalink
feat: optimize store message cache
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Mar 30, 2022
1 parent f0124c8 commit c473abc
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 272 deletions.
17 changes: 7 additions & 10 deletions go/cmd/berty/mini/view_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,16 @@ func (v *groupView) loop(ctx context.Context) {
for {
evt, err = cl.Recv()
if err != nil {
if err == io.EOF {
return
if err != io.EOF {
v.syncMessages <- &historyMessage{
messageType: messageTypeError,
payload: []byte(err.Error()),
}
}

// @TODO: Log this

v.syncMessages <- &historyMessage{
messageType: messageTypeError,
payload: []byte(err.Error()),
}
continue
return
}

// @TODO: Log this
metadataEventHandler(ctx, v, evt, false, v.logger)
}
}()
Expand Down
11 changes: 10 additions & 1 deletion go/internal/cryptoutil/keystore_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ func (m *MessageKeystore) GetDeviceChainKey(ctx context.Context, groupPK, pk cry
return ds, nil
}

func (m *MessageKeystore) HasSecretForRawDevicePK(ctx context.Context, groupPK, devicePK []byte) (has bool) {
if m == nil {
return false
}

key := idForCurrentCK(groupPK, devicePK)
has, _ = m.store.Has(ctx, key)
return
}

func (m *MessageKeystore) delPrecomputedKey(ctx context.Context, groupPK, device crypto.PubKey, counter uint64) error {
if m == nil {
return errcode.ErrInvalidInput
Expand Down Expand Up @@ -435,7 +445,6 @@ func (m *MessageKeystore) OpenEnvelope(
msg, attachmentsCIDs, err := m.OpenEnvelopePayload(ctx, env, headers, g, ownPK, id)
if err != nil {
return nil, nil, nil, errcode.TODO.Wrap(err)

}

return headers, msg, attachmentsCIDs, nil
Expand Down
1 change: 1 addition & 0 deletions go/internal/handshake/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (hc *handshakeContext) receiveResponderAccept() error {
if err != nil {
return errcode.ErrHandshakeResponderAcceptBoxKeyGen.Wrap(err)
}

respBytes, _ := box.OpenAfterPrecomputation(
nil,
boxEnvelope.Box,
Expand Down
1 change: 0 additions & 1 deletion go/internal/ipfsutil/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pkg/errors"

"berty.tech/berty/v2/go/pkg/errcode"
// nolint:staticcheck
encrepo "berty.tech/go-ipfs-repo-encrypted"
)

Expand Down
87 changes: 11 additions & 76 deletions go/pkg/bertyprotocol/api_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ import (
"fmt"

"github.com/libp2p/go-eventbus"
"go.uber.org/zap"

"berty.tech/berty/v2/go/pkg/errcode"
"berty.tech/berty/v2/go/pkg/protocoltypes"

"berty.tech/go-orbit-db/stores"
)

func checkParametersConsistency(sinceID, untilID []byte, sinceNow, untilNow, reverseOrder bool) error {
Expand Down Expand Up @@ -103,8 +100,6 @@ func (s *service) GroupMetadataList(req *protocoltypes.GroupMetadataList_Request
}

// Subscribe to new metadata events and stream them if requested
listPreviouseMetadataDone := false
bufferMetadata := []*protocoltypes.GroupMetadataEvent{}
for {
var event interface{}
select {
Expand All @@ -114,43 +109,16 @@ func (s *service) GroupMetadataList(req *protocoltypes.GroupMetadataList_Request
case event = <-newEvents:
}

var metadatas []*protocoltypes.GroupMetadataEvent
switch evt := event.(type) {
case stores.EventReplicated:
entries := evt.Entries
metadatas = []*protocoltypes.GroupMetadataEvent{}
cg.logger.Info("receving replicated metadata events", zap.Int("metadatas", len(entries)))
for _, entry := range entries {
msg, _, err := cg.MetadataStore().openMetadataEntry(entry)
if err != nil {
s.logger.Error("unable to open metadata", zap.Error(err))
continue
}

metadatas = append(metadatas, msg)
}

if !listPreviouseMetadataDone {
bufferMetadata = append(bufferMetadata, metadatas...)
continue
}

case protocoltypes.GroupMetadataEvent:
if evt.EventContext == nil {
listPreviouseMetadataDone = true
metadatas = bufferMetadata
} else {
metadatas = []*protocoltypes.GroupMetadataEvent{&evt}
}
msg := event.(protocoltypes.GroupMetadataEvent)
if msg.EventContext == nil {
continue
}

for _, msg := range metadatas {
if err := sub.Send(msg); err != nil {
return err
}
if err := sub.Send(&msg); err != nil {
return err
}

cg.logger.Info("service - metadata store - sent event from log subscription", zap.Int("metadatas", len(metadatas)))
cg.logger.Info("service - metadata store - sent 1 event from log subscription")
}
}

Expand All @@ -174,7 +142,6 @@ func (s *service) GroupMessageList(req *protocoltypes.GroupMessageList_Request,
var newEvents <-chan interface{}
if req.UntilID == nil && !req.UntilNow {
sub, err := cg.MessageStore().EventBus().Subscribe([]interface{}{
new(stores.EventReplicated),
new(protocoltypes.GroupMessageEvent),
}, eventbus.BufSize(32))
if err != nil {
Expand Down Expand Up @@ -223,7 +190,6 @@ func (s *service) GroupMessageList(req *protocoltypes.GroupMessageList_Request,

// Subscribe to new message events and stream them if requested
// listPreviouseMessageDone := false
bufferMessage := []*protocoltypes.GroupMessageEvent{}
for {
var event interface{}
select {
Expand All @@ -233,46 +199,15 @@ func (s *service) GroupMessageList(req *protocoltypes.GroupMessageList_Request,
case event = <-newEvents:
}

var messages []*protocoltypes.GroupMessageEvent
switch evt := event.(type) {
case stores.EventReplicated:
entries := evt.Entries
messages = []*protocoltypes.GroupMessageEvent{}
for _, entry := range entries {
if err := cg.MessageStore().addToMessageQueue(ctx, entry); err != nil {
s.logger.Error("unable to add message to queue", zap.Error(err))
}
// msg, err := cg.MessageStore().openMessage(ctx, entry, false)
// if err != nil {
// s.logger.Error("unable to open message", zap.Error(err))
// continue
// } else {
// messages = append(messages, msg)
// }
}

msg := event.(protocoltypes.GroupMessageEvent)
if msg.EventContext == nil {
continue

// if !listPreviouseMessageDone {
// bufferMessage = append(bufferMessage, messages...)
// continue
// }

case protocoltypes.GroupMessageEvent:
if evt.EventContext == nil {
// listPreviouseMessageDone = true
messages = bufferMessage
} else {
messages = []*protocoltypes.GroupMessageEvent{&evt}
}
}

for _, msg := range messages {
if err := sub.Send(msg); err != nil {
return err
}
if err := sub.Send(&msg); err != nil {
return err
}

cg.logger.Info("service - message store - sent event from log subscription", zap.Int("messages", len(messages)))
cg.logger.Info("service - message store - sent 1 event from log subscription")
}
}
2 changes: 1 addition & 1 deletion go/pkg/bertyprotocol/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func FillMessageKeysHolderUsingNewData(ctx context.Context, gc *GroupContext) <-

// A new chainKey is registered, check if cached messages can be opened with it
if rawPK, err := pk.Raw(); err == nil {
gc.MessageStore().ProcessMessageQueueForDevicePK(ctx, rawPK)
gc.MessageStore().ProcessMessageQueueForDevicePK(rawPK)
}

ch <- pk
Expand Down

0 comments on commit c473abc

Please sign in to comment.