Skip to content

Commit

Permalink
fix(messenger): don't dispatch useless member updates
Browse files Browse the repository at this point in the history
Signed-off-by: Norman Meier <norman@berty.tech>
  • Loading branch information
n0izn0iz committed Dec 31, 2021
1 parent c1034dd commit 7ef1178
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 92 deletions.
20 changes: 13 additions & 7 deletions go/internal/messengerdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func (d *DBWrapper) AddMember(memberPK, groupPK, displayName, avatarCID string,
}
member.DisplayName = displayName

return tx.db.Create(&member).Error
return tx.db.Create(member).Error
}); errors.Is(err, errcode.ErrDBEntryAlreadyExists) {
return member, err
} else if err != nil {
Expand Down Expand Up @@ -1140,6 +1140,9 @@ func (d *DBWrapper) UpsertMember(memberPK, groupPK string, m messengertypes.Memb
if err != nil {
return nil, false, errcode.ErrDBRead.Wrap(err)
}
if um.Equals(em) {
um = nil
}

commonDetails := []tyber.StepMutator{tyber.WithJSONDetail("FinalMember", um)}
if isNew {
Expand Down Expand Up @@ -1741,16 +1744,19 @@ func (d *DBWrapper) CreateOrUpdateReaction(reaction *messengertypes.Reaction) (b
return updated, nil
}

func (d *DBWrapper) MarkMemberAsConversationCreator(memberPK, conversationPK string) error {
func (d *DBWrapper) MarkMemberAsConversationCreator(memberPK, conversationPK string) (*messengertypes.Member, error) {
member, err := d.GetMemberByPK(memberPK, conversationPK)
if err != nil {
return errcode.ErrDBRead.Wrap(err)
return nil, errcode.ErrDBRead.Wrap(err)
}

member.IsCreator = true
if err := d.db.Save(member).Error; err != nil {
return errcode.ErrDBWrite.Wrap(err)
if !member.IsCreator {
member.IsCreator = true
if err := d.db.Save(member).Error; err != nil {
return nil, errcode.ErrDBWrite.Wrap(err)
}
return member, nil
}

return nil
return nil, nil
}
179 changes: 94 additions & 85 deletions go/internal/messengerpayloads/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (h *EventHandler) contactRequestAccepted(contact *mt.Contact, memberPK []by
func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes.GroupMetadataEvent) error {
var ev protocoltypes.MultiMemberInitialMember
if err := proto.Unmarshal(gme.GetEvent(), &ev); err != nil {
return err
return errcode.ErrInvalidInput.Wrap(err)
}

mpkb := ev.GetMemberPK()
Expand All @@ -642,6 +642,9 @@ func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes
if err := h.db.TX(h.ctx, func(tx *messengerdb.DBWrapper) error {
// create or update member

var update *mt.Member
isNew := false

member, err := tx.GetMemberByPK(mpk, gpk)
if err != gorm.ErrRecordNotFound && err != nil {
return errcode.ErrDBRead.Wrap(err)
Expand All @@ -655,31 +658,27 @@ func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes

isMe := bytes.Equal(ownMemberPK, mpkb)

if _, err := tx.AddMember(mpk, gpk, "", "", isMe, true); err != nil {
if update, err = tx.AddMember(mpk, gpk, "", "", isMe, true); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}
} else if err := tx.MarkMemberAsConversationCreator(member.PublicKey, gpk); err != nil {

isNew = true
} else if update, err = tx.MarkMemberAsConversationCreator(member.PublicKey, gpk); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

return nil
}); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

// dispatch update
{
member, err := h.db.GetMemberByPK(mpk, gpk)
if err != nil {
return errcode.ErrDBRead.Wrap(err)
}
if update != nil {
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: update}, isNew)
if err != nil {
return err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: member}, true)
if err != nil {
return err
h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
}

h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", true))
return nil
}); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

return nil
Expand All @@ -692,7 +691,7 @@ func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes
func (h *EventHandler) groupMemberDeviceAdded(gme *protocoltypes.GroupMetadataEvent) error {
var ev protocoltypes.GroupAddMemberDevice
if err := proto.Unmarshal(gme.GetEvent(), &ev); err != nil {
return err
return errcode.ErrInvalidInput.Wrap(err)
}

mpkb := ev.GetMemberPK()
Expand All @@ -715,86 +714,94 @@ func (h *EventHandler) groupMemberDeviceAdded(gme *protocoltypes.GroupMetadataEv

isMe := bytes.Equal(ownMemberPK, mpkb)

// Register device if not already known
if _, err := h.db.GetDeviceByPK(dpk); errors.Is(err, errcode.ErrNotFound) || errors.Is(err, gorm.ErrRecordNotFound) {
device, err := h.db.AddDevice(dpk, mpk)
if err != nil {
return err
}
if err := h.db.TX(h.ctx, func(d *messengerdb.DBWrapper) error {
// Register device if not already known
if _, err := d.GetDeviceByPK(dpk); errors.Is(err, errcode.ErrNotFound) || errors.Is(err, gorm.ErrRecordNotFound) {
device, err := d.AddDevice(dpk, mpk)
if err != nil {
return err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeDeviceUpdated, &mt.StreamEvent_DeviceUpdated{Device: device}, true)
if err != nil {
h.logger.Error("error dispatching device updated", zap.Error(err))
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeDeviceUpdated, &mt.StreamEvent_DeviceUpdated{Device: device}, true)
if err != nil {
h.logger.Error("error dispatching device updated", zap.Error(err))
}
}
}

// Check whether a contact request has been accepted (a device from the contact has been added to the group)
if contact, err := h.db.GetContactByPK(mpk); err == nil && contact.GetState() == mt.Contact_OutgoingRequestSent {
if err := h.contactRequestAccepted(contact, mpkb); err != nil {
return err
// Check whether a contact request has been accepted (a device from the contact has been added to the group)
if contact, err := d.GetContactByPK(mpk); err == nil && contact.GetState() == mt.Contact_OutgoingRequestSent {
if err := h.contactRequestAccepted(contact, mpkb); err != nil {
return err
}
}
}

// check backlogs
userInfo := (*mt.AppMessage_SetUserInfo)(nil)
{
backlog, err := h.db.AttributeBacklogInteractions(dpk, gpk, mpk)
if err != nil {
return err
}
// check backlogs
userInfo := (*mt.AppMessage_SetUserInfo)(nil)
{
backlog, err := d.AttributeBacklogInteractions(dpk, gpk, mpk)
if err != nil {
return err
}

for _, elem := range backlog {
h.logger.Info("found elem in backlog", zap.String("type", elem.GetType().String()), logutil.PrivateString("device-pk", elem.GetDevicePublicKey()), logutil.PrivateString("conv", elem.GetConversationPublicKey()))
for _, elem := range backlog {
h.logger.Info("found elem in backlog", zap.String("type", elem.GetType().String()), logutil.PrivateString("device-pk", elem.GetDevicePublicKey()), logutil.PrivateString("conv", elem.GetConversationPublicKey()))

elem.MemberPublicKey = mpk
elem.MemberPublicKey = mpk

switch elem.GetType() {
case mt.AppMessage_TypeSetUserInfo:
var payload mt.AppMessage_SetUserInfo
switch elem.GetType() {
case mt.AppMessage_TypeSetUserInfo:
var payload mt.AppMessage_SetUserInfo

if err := proto.Unmarshal(elem.GetPayload(), &payload); err != nil {
return err
}
if err := proto.Unmarshal(elem.GetPayload(), &payload); err != nil {
return err
}

userInfo = &payload
userInfo = &payload

if err := h.db.DeleteInteractions([]string{elem.CID}); err != nil {
return err
}
if err := d.DeleteInteractions([]string{elem.CID}); err != nil {
return err
}

if err := h.dispatcher.StreamEvent(mt.StreamEvent_TypeInteractionDeleted, &mt.StreamEvent_InteractionDeleted{CID: elem.GetCID(), ConversationPublicKey: gpk}, false); err != nil {
return err
}
if err := h.dispatcher.StreamEvent(mt.StreamEvent_TypeInteractionDeleted, &mt.StreamEvent_InteractionDeleted{CID: elem.GetCID(), ConversationPublicKey: gpk}, false); err != nil {
return err
}

default:
if err := messengerutil.StreamInteraction(h.dispatcher, h.db, elem.CID, false); err != nil {
return err
default:
if err := messengerutil.StreamInteraction(h.dispatcher, d, elem.CID, false); err != nil {
return err
}
}
}
}
}

member := &mt.Member{
PublicKey: mpk,
ConversationPublicKey: gpk,
IsMe: isMe,
}
if userInfo != nil {
member.DisplayName = userInfo.GetDisplayName()
member.AvatarCID = userInfo.GetAvatarCID()
}
member := &mt.Member{
PublicKey: mpk,
ConversationPublicKey: gpk,
IsMe: isMe,
}
if userInfo != nil {
member.DisplayName = userInfo.GetDisplayName()
member.AvatarCID = userInfo.GetAvatarCID()
}

member, isNew, err := h.db.UpsertMember(mpk, gpk, *member)
if err != nil {
return err
}
update, isNew, err := d.UpsertMember(mpk, gpk, *member)
if err != nil {
return err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: member}, isNew)
if err != nil {
return err
}
if update != nil {
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: update}, isNew)
if err != nil {
return err
}

h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
}

return nil
}); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

return nil
}
Expand Down Expand Up @@ -973,7 +980,7 @@ func (h *EventHandler) handleAppMessageSetUserInfo(tx *messengerdb.DBWrapper, i
}
h.logger.Debug("interesting member SetUserInfo")

member, isNew, err := tx.UpsertMember(
update, isNew, err := tx.UpsertMember(
i.MemberPublicKey,
i.ConversationPublicKey,
mt.Member{DisplayName: payload.GetDisplayName(), AvatarCID: payload.GetAvatarCID(), InfoDate: i.GetSentDate()},
Expand All @@ -982,12 +989,14 @@ func (h *EventHandler) handleAppMessageSetUserInfo(tx *messengerdb.DBWrapper, i
return nil, false, err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: member}, isNew)
if err != nil {
return nil, false, err
}
if update != nil {
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: update}, isNew)
if err != nil {
return nil, false, err
}

h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
h.logger.Info("dispatched member update", zap.Any("member", update), zap.Bool("isNew", isNew))
}

return i, false, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/pkg/bertymessenger/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func TestingInfra(ctx context.Context, t *testing.T, amount int, logger *zap.Log
protocols, cleanup := bertyprotocol.NewTestingProtocolWithMockedPeers(ctx, t, &bertyprotocol.TestingOpts{Logger: logger, Mocknet: mocknet}, nil, amount)
clients := make([]messengertypes.MessengerServiceClient, amount)

// wait for protocol warmup
time.Sleep(1 * time.Second)

// setup client
for i, p := range protocols {
// new messenger service
Expand Down
16 changes: 16 additions & 0 deletions go/pkg/messengertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,19 @@ func (am *AppMessage) TextRepresentation() (string, error) {
func (m *AppMessage_UserMessage) TextRepresentation() (string, error) {
return m.GetBody(), nil
}

func (m *Member) Equals(other *Member) bool {
if m == nil && other == nil {
return true
}
if !(m != nil && other != nil) {
return false
}
return m.PublicKey == other.PublicKey &&
m.DisplayName == other.DisplayName &&
m.AvatarCID == other.AvatarCID &&
m.ConversationPublicKey == other.ConversationPublicKey &&
m.IsMe == other.IsMe &&
m.IsCreator == other.IsCreator &&
m.InfoDate == other.InfoDate
}

0 comments on commit 7ef1178

Please sign in to comment.