Skip to content

Commit

Permalink
fix: multi commitlogs creation and subscriptions
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Dec 21, 2018
1 parent c68eddc commit c9830dc
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 108 deletions.
Expand Up @@ -120,14 +120,8 @@ const CommitLogStream = graphql`
}
`

let _subscriber = null

export default context => {
if (_subscriber === null) {
_subscriber = subscriber({
environment: context.environment,
subscription: CommitLogStream,
})
}
return _subscriber
}
export default context =>
subscriber({
environment: context.environment,
subscription: CommitLogStream,
})
5 changes: 1 addition & 4 deletions client/react-native/common/graphql/subscriptions/Contact.js
@@ -1,9 +1,6 @@
import CommitLogStream from './CommitLogStream'

export default context => ({
...CommitLogStream(context),
subscribe: ({ updater }) =>
CommitLogStream(context).subscribe({
context.subscriptions.commitLogStream.subscribe({
updater:
updater &&
((store, data) => {
Expand Down
@@ -1,9 +1,6 @@
import CommitLogStream from './CommitLogStream'

export default context => ({
...CommitLogStream(context),
subscribe: ({ updater }) =>
CommitLogStream(context).subscribe({
context.subscriptions.commitLogStream.subscribe({
updater:
updater &&
((store, data) => {
Expand Down
5 changes: 1 addition & 4 deletions client/react-native/common/graphql/subscriptions/Message.js
@@ -1,9 +1,6 @@
import CommitLogStream from './CommitLogStream'

export default context => ({
...CommitLogStream(context),
subscribe: ({ updater }) =>
CommitLogStream(context).subscribe({
context.subscriptions.commitLogStream.subscribe({
updater:
updater &&
((store, data) => {
Expand Down
2 changes: 1 addition & 1 deletion client/react-native/common/relay/environment.js
Expand Up @@ -78,7 +78,7 @@ const setupMiddlewares = async ({ getIp, getPort }) => [
}),
retryMiddleware({
allowMutations: true,
fetchTimeout: 3000,
fetchTimeout: 5000,
retryDelays: () => 2000,
beforeRetry: async ({
forceRetry,
Expand Down
25 changes: 10 additions & 15 deletions client/react-native/common/relay/subscriber.js
Expand Up @@ -2,28 +2,23 @@ import { requestSubscription } from 'react-relay'

export default ({ environment, subscription, updaters = [], variables }) => {
let _updaters = updaters
let _subscriber = null

requestSubscription(environment, {
subscription,
variables,
onNext: () => {},
onCompleted: () => {},
onError: error => console.error(error),
updater: (store, data) =>
_updaters.forEach(updater => updater(store, data)),
})

return {
subscribe: ({ updater }) => {
if (_updaters.length === 0 && updater) {
_subscriber = requestSubscription(environment, {
subscription,
variables,
onNext: () => {},
onCompleted: () => {},
onError: error => console.error(error),
updater: (store, data) =>
_updaters.forEach(updater => updater(store, data)),
})
}
updater && _updaters.push(updater)
return {
unsubscribe: () => {
_updaters = updater ? _updaters.filter(_ => _ !== updater) : _updaters
if (_updaters.length === 0) {
_subscriber.dispose()
}
},
}
},
Expand Down
10 changes: 0 additions & 10 deletions core/node/mainloop.go
Expand Up @@ -97,16 +97,6 @@ func (n *Node) handleClientEvent(ctx context.Context, event *p2p.Event) {
n.clientEventsMutex.Unlock()
}

func (n *Node) handleClientCommitLogs(ctx context.Context, commitLog *node.CommitLog) {
logger().Debug("commit log", zap.Stringer("commit log", commitLog))

n.clientCommitLogsMutex.Lock()
defer n.clientCommitLogsMutex.Unlock()
for _, sub := range n.clientCommitLogsSubscribers {
sub.queue <- commitLog
}
}

func (n *Node) handleOutgoingEvent(ctx context.Context, event *p2p.Event) {
logger().Debug("outgoing event", zap.Stringer("event", event))

Expand Down
9 changes: 0 additions & 9 deletions core/node/nodeclient.go
Expand Up @@ -90,10 +90,6 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt
logger().Debug("CommitLogStream connected", zap.Stringer("input", input))

n.clientCommitLogsMutex.Lock()
// start retrieve commit logs from db
if len(n.clientCommitLogsSubscribers) == 0 {
n.handleCommitLogs()
}
sub := clientCommitLogsSubscriber{
queue: make(chan *node.CommitLog, 100),
}
Expand All @@ -113,18 +109,13 @@ func (n *Node) CommitLogStream(input *node.Void, stream node.Service_CommitLogSt
)
}
}
// stop retrieve commit logs from db
if len(n.clientCommitLogsSubscribers) == 0 {
n.unhandleCommitLogs()
}
}()

for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case commitLog, ok := <-sub.queue:
logger().Debug("send commit log")
if !ok {
logger().Error("CommitLogStream chan closed")
return errors.New("commitLogStream chan closed")
Expand Down
61 changes: 11 additions & 50 deletions core/node/sql.go
Expand Up @@ -8,7 +8,6 @@ import (
"berty.tech/core/api/node"
"berty.tech/core/api/p2p"
"berty.tech/core/entity"
"berty.tech/core/sql"
"github.com/jinzhu/gorm"
opentracing "github.com/opentracing/opentracing-go"
"go.uber.org/zap"
Expand All @@ -18,6 +17,9 @@ import (
func WithSQL(sql *gorm.DB) NewNodeOption {
return func(n *Node) {
n.sqlDriver = sql.Unscoped()
sql.Callback().Create().Register("berty:after_create", func(scope *gorm.Scope) { n.handleCommitLog("create", scope) })
sql.Callback().Update().Register("berty:after_update", func(scope *gorm.Scope) { n.handleCommitLog("update", scope) })
sql.Callback().Delete().Register("berty:after_delete", func(scope *gorm.Scope) { n.handleCommitLog("delete", scope) })
}
}

Expand All @@ -32,26 +34,6 @@ func (n *Node) sql(ctx context.Context) *gorm.DB {
return n.sqlDriver
}

func (n *Node) handleCommitLogs() {
clbk := n.sqlDriver.Callback()

clbk.Create().Register("berty:after_create", func(scope *gorm.Scope) { n.handleCommitLog("create", scope) })
clbk.Update().Register("berty:after_update", func(scope *gorm.Scope) { n.handleCommitLog("update", scope) })
clbk.Delete().Register("berty:after_delete", func(scope *gorm.Scope) { n.handleCommitLog("delete", scope) })

logger().Debug("commit logs handled")
}

func (n *Node) unhandleCommitLogs() {
clbk := n.sqlDriver.Callback()

clbk.Create().Remove("berty:after_create")
clbk.Update().Remove("berty:after_update")
clbk.Delete().Remove("berty:after_delete")

logger().Debug("commit logs unhandled")
}

func (n *Node) handleCommitLog(operation string, scope *gorm.Scope) {
// same usage as https://github.com/jinzhu/gorm/blob/master/scope.go#L241

Expand Down Expand Up @@ -88,6 +70,12 @@ func (n *Node) sendCommitLog(commitLog *node.CommitLog) {

func (n *Node) createCommitLog(operation string, reflectValue reflect.Value) *node.CommitLog {

// Only get address from non-pointer
if reflectValue.CanAddr() && reflectValue.Kind() != reflect.Ptr {
reflectValue = reflectValue.Addr()
}

logger().Debug(fmt.Sprintf("OPERATION COMMIT LOG %+v", operation))
log := &node.CommitLog{}

switch operation {
Expand All @@ -102,46 +90,19 @@ func (n *Node) createCommitLog(operation string, reflectValue reflect.Value) *no
return nil
}

// Only get address from non-pointer
if reflectValue.CanAddr() && reflectValue.Kind() != reflect.Ptr {
reflectValue = reflectValue.Addr()
}

switch e := reflectValue.Interface().(type) {
case *entity.Config:
log.Entity = &node.CommitLog_Entity{Config: e}
switch data := reflectValue.Interface().(type) {
case *entity.Contact:
data, err := sql.ContactByID(n.sqlDriver, e.ID)
if err != nil {
return nil
}
log.Entity = &node.CommitLog_Entity{Contact: data}
case *entity.Device:
data, err := sql.DeviceByID(n.sqlDriver, e.ID)
if err != nil {
return nil
}
log.Entity = &node.CommitLog_Entity{Device: data}
case *entity.Conversation:
data, err := sql.ConversationByID(n.sqlDriver, e.ID)
if err != nil {
return nil
}
log.Entity = &node.CommitLog_Entity{Conversation: data}
case *entity.ConversationMember:
data, err := sql.ConversationMemberByID(n.sqlDriver, e.ID)
if err != nil {
return nil
}
log.Entity = &node.CommitLog_Entity{ConversationMember: data}
case *p2p.Event:
data, err := sql.EventByID(n.sqlDriver, e.ID)
if err != nil {
return nil
}
log.Entity = &node.CommitLog_Entity{Event: data}
default:
logger().Warn(fmt.Sprintf("unhandled entity %+v", e))
logger().Warn(fmt.Sprintf("unhandled entity %+v", data))
return nil
}
return log
Expand Down

0 comments on commit c9830dc

Please sign in to comment.