From b911c7ba9fa12a485c9662c98c8078a678310d17 Mon Sep 17 00:00:00 2001 From: Godefroy Ponsinet Date: Thu, 20 Dec 2018 12:58:39 +0100 Subject: [PATCH] fix(node): commit log stream Signed-off-by: Godefroy Ponsinet --- core/node/node.go | 15 ++++++--------- core/node/nodeclient.go | 15 +++++++++------ core/node/sql.go | 25 +++++++++++++++++-------- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/core/node/node.go b/core/node/node.go index 8da9e76639..de22457d5d 100644 --- a/core/node/node.go +++ b/core/node/node.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "berty.tech/core/api/node" "berty.tech/core/api/p2p" "berty.tech/core/crypto/keypair" "berty.tech/core/crypto/sigchain" @@ -25,8 +24,7 @@ import ( // Node is the top-level object of a Berty peer type Node struct { - clientCommitLogs chan *node.CommitLog - clientCommitLogsSubscribers []*clientCommitLogsSubscriber + clientCommitLogsSubscribers []clientCommitLogsSubscriber clientCommitLogsMutex sync.Mutex clientEvents chan *p2p.Event clientEventsSubscribers []clientEventSubscriber @@ -62,12 +60,11 @@ func New(ctx context.Context, opts ...NewNodeOption) (*Node, error) { n := &Node{ // FIXME: fetch myself from db - outgoingEvents: make(chan *p2p.Event, 100), - clientEvents: make(chan *p2p.Event, 100), - clientCommitLogs: make(chan *node.CommitLog, 100), - createdAt: time.Now().UTC(), - rootSpan: span, - rootContext: ctx, + outgoingEvents: make(chan *p2p.Event, 100), + clientEvents: make(chan *p2p.Event, 100), + createdAt: time.Now().UTC(), + rootSpan: span, + rootContext: ctx, } // apply optioners diff --git a/core/node/nodeclient.go b/core/node/nodeclient.go index 21b55e1290..8ad1ea9b6e 100644 --- a/core/node/nodeclient.go +++ b/core/node/nodeclient.go @@ -89,16 +89,17 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt logger().Debug("CommitLogStream connected", zap.Stringer("input", input)) - sub := &clientCommitLogsSubscriber{ + n.clientCommitLogsMutex.Lock() + // start retrieve commit logs from db + if len(n.clientCommitLogsSubscribers) == 0 { + n.handleCommitLogs() + } + sub := clientCommitLogsSubscriber{ queue: make(chan *node.CommitLog, 100), } - - n.clientCommitLogsMutex.Lock() n.clientCommitLogsSubscribers = append(n.clientCommitLogsSubscribers, sub) n.clientCommitLogsMutex.Unlock() - n.handleCommitLogs() - defer func() { logger().Debug("CommitLogStream disconnected", zap.Stringer("input", input)) @@ -108,10 +109,11 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt if s == sub { n.clientCommitLogsSubscribers = append( n.clientCommitLogsSubscribers[:i], - n.clientCommitLogsSubscribers[i:]..., + n.clientCommitLogsSubscribers[i+1:]..., ) } } + // stop retrieve commit logs from db if len(n.clientCommitLogsSubscribers) == 0 { n.unhandleCommitLogs() } @@ -122,6 +124,7 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt case <-stream.Context().Done(): return stream.Context().Err() case commitLog, ok := <-sub.queue: + logger().Debug("send commit log") if !ok { logger().Error("CommitLogStream chan closed") return errors.New("commitLogStream chan closed") diff --git a/core/node/sql.go b/core/node/sql.go index 4d75803e68..ed02843a95 100644 --- a/core/node/sql.go +++ b/core/node/sql.go @@ -10,6 +10,7 @@ import ( "berty.tech/core/entity" "github.com/jinzhu/gorm" opentracing "github.com/opentracing/opentracing-go" + "go.uber.org/zap" ) // WithSQL registers a gorm connection as the node database @@ -63,16 +64,24 @@ func (n *Node) handleCommitLog(operation string, scope *gorm.Scope) { if indirectScopeValue := scope.IndirectValue(); indirectScopeValue.Kind() == reflect.Slice { for i := 0; i < indirectScopeValue.Len(); i++ { - log := n.createCommitLog(operation, indirectScopeValue.Index(i)) - if log != nil { - n.clientCommitLogs <- log - } + n.sendCommitLog(n.createCommitLog(operation, indirectScopeValue.Index(i))) } } else { - log := n.createCommitLog(operation, indirectScopeValue) - if log != nil { - n.clientCommitLogs <- log - } + n.sendCommitLog(n.createCommitLog(operation, indirectScopeValue)) + } +} + +func (n *Node) sendCommitLog(commitLog *node.CommitLog) { + if commitLog == nil { + return + } + + logger().Debug("commit log", zap.Stringer("commit log", commitLog)) + + n.clientCommitLogsMutex.Lock() + defer n.clientCommitLogsMutex.Unlock() + for _, sub := range n.clientCommitLogsSubscribers { + sub.queue <- commitLog } }