Skip to content

Commit

Permalink
fix: use replicated events instead of progress events
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Mar 30, 2022
1 parent e7346a2 commit 5a1b3ed
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 // indirect
berty.tech/go-ipfs-log v1.7.0
berty.tech/go-ipfs-repo-encrypted v1.1.3
berty.tech/go-orbit-db v1.16.0
berty.tech/go-orbit-db v1.16.1
berty.tech/ipfs-webui-packed v1.0.0-v2.11.4-1
fyne.io/fyne/v2 v2.1.1
github.com/Masterminds/goutils v1.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 17 additions & 16 deletions go/pkg/bertyprotocol/store_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ func constructorFactoryGroupMessage(s *BertyOrbitDB, logger *zap.Logger) iface.S

chSub, err := store.EventBus().Subscribe([]interface{}{
new(stores.EventWrite),
new(stores.EventReplicateProgress),
})
new(stores.EventReplicated),
}, eventbus.BufSize(128))
if err != nil {
return nil, fmt.Errorf("unable to subscribe to store events")
}
Expand All @@ -444,26 +444,27 @@ func constructorFactoryGroupMessage(s *BertyOrbitDB, logger *zap.Logger) iface.S
return
}

var entry ipfslog.Entry
var entries []ipfslog.Entry

switch evt := e.(type) {
case stores.EventWrite:
entry = evt.Entry
entries = []ipfslog.Entry{evt.Entry}

case stores.EventReplicateProgress:
entry = evt.Entry
case stores.EventReplicated:
entries = evt.Entries
}

ctx = tyber.ContextWithConstantTraceID(ctx, "msgrcvd-"+entry.GetHash().String())
store.logger.Debug("Received message store event", tyber.FormatTraceLogFields(ctx)...)

store.logger.Debug(
"Message store event",
tyber.FormatStepLogFields(ctx, []tyber.Detail{{Name: "RawEvent", Description: fmt.Sprint(e)}})...,
)

if err := store.addToMessageQueue(ctx, entry); err != nil {
logger.Error("unable to add message to queue", zap.Error(err))
for _, entry := range entries {
ctx = tyber.ContextWithConstantTraceID(ctx, "msgrcvd-"+entry.GetHash().String())
store.logger.Debug("Received message store event", tyber.FormatTraceLogFields(ctx)...)
store.logger.Debug(
"Message store event",
tyber.FormatStepLogFields(ctx, []tyber.Detail{{Name: "RawEvent", Description: fmt.Sprint(e)}})...,
)

if err := store.addToMessageQueue(ctx, entry); err != nil {
logger.Error("unable to add message to queue", zap.Error(err))
}
}
}
}()
Expand Down
74 changes: 36 additions & 38 deletions go/pkg/bertyprotocol/store_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,8 @@ func constructorFactoryGroupMetadata(s *BertyOrbitDB, logger *zap.Logger) iface.

chSub, err := store.eventBus.Subscribe([]interface{}{
new(stores.EventWrite),
new(stores.EventReplicateProgress),
})
new(stores.EventReplicated),
}, eventbus.BufSize(128))
if err != nil {
return nil, fmt.Errorf("unable to subscribe to store events")
}
Expand All @@ -1049,48 +1049,46 @@ func constructorFactoryGroupMetadata(s *BertyOrbitDB, logger *zap.Logger) iface.
return
}

var entry ipfslog.Entry
var entries []ipfslog.Entry

switch evt := e.(type) {
case stores.EventWrite:
entry = evt.Entry
entries = []ipfslog.Entry{evt.Entry}

case stores.EventReplicateProgress:
entry = evt.Entry
case stores.EventReplicated:
entries = evt.Entries
}

if entry == nil {
continue
}

ctx = tyber.ContextWithConstantTraceID(ctx, "msgrcvd-"+entry.GetHash().String())
tyber.LogTraceStart(ctx, store.logger, fmt.Sprintf("Received metadata from %s group %s", shortGroupType, b64GroupPK))

metaEvent, event, err := openMetadataEntry(store.OpLog(), entry, g, store.devKS)
if err != nil {
_ = tyber.LogFatalError(ctx, store.logger, "Unable to open metadata event", err, tyber.WithDetail("RawEvent", fmt.Sprint(e)), tyber.ForceReopen)
continue
}

tyber.LogStep(ctx, store.logger, "Opened metadata store event",
tyber.ForceReopen,
tyber.EndTrace,
tyber.WithJSONDetail("MetaEvent", metaEvent),
tyber.WithJSONDetail("Event", event),
tyber.UpdateTraceName(fmt.Sprintf("Received %s from %s group %s", strings.TrimPrefix(metaEvent.GetMetadata().GetEventType().String(), "EventType"), shortGroupType, b64GroupPK)),
)

recvEvent := EventMetadataReceived{
MetaEvent: metaEvent,
Event: event,
}

if err := store.emitters.metadataReceived.Emit(recvEvent); err != nil {
store.logger.Warn("unable to emit recv event", zap.Error(err))
}

if err := store.emitters.groupMetadata.Emit(*metaEvent); err != nil {
store.logger.Warn("unable to emit group metadata event", zap.Error(err))
for _, entry := range entries {
ctx = tyber.ContextWithConstantTraceID(ctx, "msgrcvd-"+entry.GetHash().String())
tyber.LogTraceStart(ctx, store.logger, fmt.Sprintf("Received metadata from %s group %s", shortGroupType, b64GroupPK))

metaEvent, event, err := openMetadataEntry(store.OpLog(), entry, g, store.devKS)
if err != nil {
_ = tyber.LogFatalError(ctx, store.logger, "Unable to open metadata event", err, tyber.WithDetail("RawEvent", fmt.Sprint(e)), tyber.ForceReopen)
continue
}

tyber.LogStep(ctx, store.logger, "Opened metadata store event",
tyber.ForceReopen,
tyber.EndTrace,
tyber.WithJSONDetail("MetaEvent", metaEvent),
tyber.WithJSONDetail("Event", event),
tyber.UpdateTraceName(fmt.Sprintf("Received %s from %s group %s", strings.TrimPrefix(metaEvent.GetMetadata().GetEventType().String(), "EventType"), shortGroupType, b64GroupPK)),
)

recvEvent := EventMetadataReceived{
MetaEvent: metaEvent,
Event: event,
}

if err := store.emitters.metadataReceived.Emit(recvEvent); err != nil {
store.logger.Warn("unable to emit recv event", zap.Error(err))
}

if err := store.emitters.groupMetadata.Emit(*metaEvent); err != nil {
store.logger.Warn("unable to emit group metadata event", zap.Error(err))
}
}
}
}()
Expand Down

0 comments on commit 5a1b3ed

Please sign in to comment.