Skip to content

Commit

Permalink
fix(node): commit log stream
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Dec 20, 2018
1 parent ef5c381 commit b911c7b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 23 deletions.
15 changes: 6 additions & 9 deletions core/node/node.go
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"berty.tech/core/api/node"
"berty.tech/core/api/p2p"
"berty.tech/core/crypto/keypair"
"berty.tech/core/crypto/sigchain"
Expand All @@ -25,8 +24,7 @@ import (

// Node is the top-level object of a Berty peer
type Node struct {
clientCommitLogs chan *node.CommitLog
clientCommitLogsSubscribers []*clientCommitLogsSubscriber
clientCommitLogsSubscribers []clientCommitLogsSubscriber
clientCommitLogsMutex sync.Mutex
clientEvents chan *p2p.Event
clientEventsSubscribers []clientEventSubscriber
Expand Down Expand Up @@ -62,12 +60,11 @@ func New(ctx context.Context, opts ...NewNodeOption) (*Node, error) {

n := &Node{
// FIXME: fetch myself from db
outgoingEvents: make(chan *p2p.Event, 100),
clientEvents: make(chan *p2p.Event, 100),
clientCommitLogs: make(chan *node.CommitLog, 100),
createdAt: time.Now().UTC(),
rootSpan: span,
rootContext: ctx,
outgoingEvents: make(chan *p2p.Event, 100),
clientEvents: make(chan *p2p.Event, 100),
createdAt: time.Now().UTC(),
rootSpan: span,
rootContext: ctx,
}

// apply optioners
Expand Down
15 changes: 9 additions & 6 deletions core/node/nodeclient.go
Expand Up @@ -89,16 +89,17 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt

logger().Debug("CommitLogStream connected", zap.Stringer("input", input))

sub := &clientCommitLogsSubscriber{
n.clientCommitLogsMutex.Lock()
// start retrieve commit logs from db
if len(n.clientCommitLogsSubscribers) == 0 {
n.handleCommitLogs()
}
sub := clientCommitLogsSubscriber{
queue: make(chan *node.CommitLog, 100),
}

n.clientCommitLogsMutex.Lock()
n.clientCommitLogsSubscribers = append(n.clientCommitLogsSubscribers, sub)
n.clientCommitLogsMutex.Unlock()

n.handleCommitLogs()

defer func() {
logger().Debug("CommitLogStream disconnected", zap.Stringer("input", input))

Expand All @@ -108,10 +109,11 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt
if s == sub {
n.clientCommitLogsSubscribers = append(
n.clientCommitLogsSubscribers[:i],
n.clientCommitLogsSubscribers[i:]...,
n.clientCommitLogsSubscribers[i+1:]...,
)
}
}
// stop retrieve commit logs from db
if len(n.clientCommitLogsSubscribers) == 0 {
n.unhandleCommitLogs()
}
Expand All @@ -122,6 +124,7 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt
case <-stream.Context().Done():
return stream.Context().Err()
case commitLog, ok := <-sub.queue:
logger().Debug("send commit log")
if !ok {
logger().Error("CommitLogStream chan closed")
return errors.New("commitLogStream chan closed")
Expand Down
25 changes: 17 additions & 8 deletions core/node/sql.go
Expand Up @@ -10,6 +10,7 @@ import (
"berty.tech/core/entity"
"github.com/jinzhu/gorm"
opentracing "github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)

// WithSQL registers a gorm connection as the node database
Expand Down Expand Up @@ -63,16 +64,24 @@ func (n *Node) handleCommitLog(operation string, scope *gorm.Scope) {

if indirectScopeValue := scope.IndirectValue(); indirectScopeValue.Kind() == reflect.Slice {
for i := 0; i < indirectScopeValue.Len(); i++ {
log := n.createCommitLog(operation, indirectScopeValue.Index(i))
if log != nil {
n.clientCommitLogs <- log
}
n.sendCommitLog(n.createCommitLog(operation, indirectScopeValue.Index(i)))
}
} else {
log := n.createCommitLog(operation, indirectScopeValue)
if log != nil {
n.clientCommitLogs <- log
}
n.sendCommitLog(n.createCommitLog(operation, indirectScopeValue))
}
}

func (n *Node) sendCommitLog(commitLog *node.CommitLog) {
if commitLog == nil {
return
}

logger().Debug("commit log", zap.Stringer("commit log", commitLog))

n.clientCommitLogsMutex.Lock()
defer n.clientCommitLogsMutex.Unlock()
for _, sub := range n.clientCommitLogsSubscribers {
sub.queue <- commitLog
}
}

Expand Down

0 comments on commit b911c7b

Please sign in to comment.