Skip to content

Commit

Permalink
feat: handle acknowledgments by device
Browse files Browse the repository at this point in the history
  • Loading branch information
glouvigny committed Mar 28, 2019
1 parent 2edcebf commit 8e85e0b
Show file tree
Hide file tree
Showing 7 changed files with 641 additions and 29 deletions.
30 changes: 19 additions & 11 deletions core/network/config/config.go
Expand Up @@ -11,6 +11,7 @@ import (
circuit "github.com/libp2p/go-libp2p-circuit"
libp2p_crypto "github.com/libp2p/go-libp2p-crypto"
discovery "github.com/libp2p/go-libp2p-discovery"
p2phost "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
pnet "github.com/libp2p/go-libp2p-pnet"
quic "github.com/libp2p/go-libp2p-quic-transport"
Expand Down Expand Up @@ -57,9 +58,9 @@ var BootstrapIpfs = []string{
}

var DefaultBind = map[string][]string{
"tcp": []string{"/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"},
"ble": []string{"/ble/00000000-0000-0000-0000-000000000000"},
"quic": []string{"/ip4/0.0.0.0/udp/0/quic", "/ip6/::/udp/0/quic"},
"tcp": {"/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"},
"ble": {"/ble/00000000-0000-0000-0000-000000000000"},
"quic": {"/ip4/0.0.0.0/udp/0/quic", "/ip6/::/udp/0/quic"},
}

type Config struct {
Expand Down Expand Up @@ -92,8 +93,9 @@ type Config struct {

Identity string

Persist bool `json:"-"`
OverridePersist bool `json:"-"` // override persist config when apply
Persist bool `json:"-"`
OverridePersist bool `json:"-"` // override persist config when apply
OverrideHost p2phost.Host `json:"-"`
}

type Option func(cfg *Config) error
Expand All @@ -115,6 +117,7 @@ func (cfg *Config) Override(override *Config) error {
cfg.Identity = override.Identity
cfg.SwarmKey = override.SwarmKey
cfg.PeerCache = override.PeerCache
cfg.OverrideHost = override.OverrideHost
return nil
}

Expand Down Expand Up @@ -259,12 +262,17 @@ func (cfg *Config) NewNode(ctx context.Context) (*host.BertyHost, error) {

// use basic host
h := &host.BertyHost{}
h.Host, err = bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.Config.ConnManager,
AddrsFactory: cfg.Config.AddrsFactory,
NATManager: cfg.Config.NATManager,
EnablePing: !cfg.Config.DisablePing,
})
if cfg.OverrideHost == nil {
h.Host, err = bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.Config.ConnManager,
AddrsFactory: cfg.Config.AddrsFactory,
NATManager: cfg.Config.NATManager,
EnablePing: !cfg.Config.DisablePing,
})
} else {
h.Host, err = cfg.OverrideHost, nil
}

if err != nil {
swrm.Close()
return nil, err
Expand Down
32 changes: 32 additions & 0 deletions core/network/options.go
Expand Up @@ -4,6 +4,7 @@ import (
"runtime"

"berty.tech/core/network/config"
host "github.com/libp2p/go-libp2p-host"
)

// Default options
Expand Down Expand Up @@ -89,6 +90,22 @@ func WithDefaultMobileOptions() config.Option {
)
}

func WithDefaultMockNetOptions(h host.Host) config.Option {
return ChainOptions(
WithClientOptions(),

EnablePrivateNetwork(config.DefaultSwarmKey),
DisableDHTServer(),
DisableHOP(),
DisableDefaultBootstrap(),
DisableMDNS(),
DisableBLE(),
DisableQUIC(),
DisablePersistConfig(),
DisableRelay(),
OverrideHost(h),
)
}
func WithDefaultRelayOptions() config.Option {
return ChainOptions(
WithServerOptions(),
Expand Down Expand Up @@ -331,3 +348,18 @@ func DisablePeerCache() config.Option {
return nil
}
}

func OverrideHost(h host.Host) config.Option {
return func(cfg *config.Config) error {
cfg.OverrideHost = h
cfg.Config.Peerstore = h.Peerstore()
return nil
}
}

func DisableRelay() config.Option {
return func(cfg *config.Config) error {
cfg.Config.Relay = false
return nil
}
}
5 changes: 3 additions & 2 deletions core/node/config.go
@@ -1,11 +1,12 @@
package node

import (
"berty.tech/core/pkg/errorcodes"
"berty.tech/core/push"
"context"
"encoding/base64"

"berty.tech/core/pkg/errorcodes"
"berty.tech/core/push"

"github.com/gofrs/uuid"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
Expand Down
63 changes: 53 additions & 10 deletions core/node/event_handlers.go
Expand Up @@ -334,31 +334,74 @@ func (n *Node) handleSeen(ctx context.Context, input *entity.Event) error {
}

func (n *Node) handleAck(ctx context.Context, input *entity.Event) error {
var ackedEvents []*entity.Event
var events []*entity.Event
ackCount := 0
now := time.Now().UTC()

ackAttrs, err := input.GetAckAttrs()
if err != nil {
return errors.Wrap(err, "unable to unmarshal ack attrs")
}

baseQuery := n.sql(ctx).
eventsIDs := ackAttrs.IDs

if err := n.sql(ctx).
Model(&entity.Event{}).
Where("id in (?)", ackAttrs.IDs)
Where("id IN (?)", eventsIDs).
Find(&events).Error; err != nil {
return errors.Wrap(err, "unable to find events to ack")
}

if err = baseQuery.
if err := n.sql(ctx).
Model(&entity.EventDispatch{}).
Where("event_id in (?) AND contact_id = ? AND device_id = ?", eventsIDs, input.SourceContactID, input.SourceDeviceID).
Count(&ackCount).
UpdateColumn("acked_at", time.Now().UTC()).
UpdateColumn("acked_at", &now).Error; err != nil {
return errors.Wrap(err, "unable to find event dispatch to ack")
}

if err := n.sql(ctx).
Model(&entity.Event{}).
Where("id IN (?) AND ack_status = ?", eventsIDs, entity.Event_NotAcked).
UpdateColumns(&entity.Event{
AckStatus: entity.Event_AckedAtLeastOnce,
}).Error; err != nil {
return errors.Wrap(err, "unable to find event dispatch to ack")
}

// TODO: find events acknowledged by at least one device per contact

whollyAcknowledgedEventsIDs := []string{}

if err := n.sql(ctx).
Model(&entity.Event{}).
Joins("LEFT JOIN event_dispatch ON event_dispatch.event_id = event.id AND event_dispatch.acked_at IS NULL").
Where("event.id in (?) AND event_dispatch.device_id IS NULL", eventsIDs).
Pluck("event.id", &whollyAcknowledgedEventsIDs).
Error; err != nil {
return errors.Wrap(err, "unable to mark events as acked")
return errors.Wrap(err, "unable to find events")
}

if ackCount == 0 {
return errors.Wrap(err, "no events to ack found")
if len(whollyAcknowledgedEventsIDs) == 0 {
return nil
}

if err = baseQuery.Find(&ackedEvents).Error; err != nil {
return errors.Wrap(err, "unable to fetch acked events")
if err := n.sql(ctx).
Model(&entity.Event{}).
Where("id IN (?)", whollyAcknowledgedEventsIDs).
UpdateColumns(&entity.Event{
AckedAt: &now,
AckStatus: entity.Event_AckedByAllDevices,
}).Error; err != nil {
return errors.Wrap(err, "unable to update events acks")
}

ackedEvents := []*entity.Event{}
if err := n.sql(ctx).
Model(&entity.Event{}).
Where("id IN (?)", whollyAcknowledgedEventsIDs).
Find(&ackedEvents).Error; err != nil {
return errors.Wrap(err, "unable to find acked events")
}

if err := n.handleAckSenderAlias(ctx, ackAttrs); err != nil {
Expand Down
159 changes: 159 additions & 0 deletions core/node/event_handlers_handle_ack_test.go
@@ -0,0 +1,159 @@
package node

import (
"context"
"testing"

"berty.tech/core/entity"

. "github.com/smartystreets/goconvey/convey"
)

func makeEventDispatch(event *entity.Event, device *TestEnvDevice) *entity.EventDispatch {
return &entity.EventDispatch{
EventID: event.ID,
DeviceID: device.Device.ID,
ContactID: device.Device.ContactID,
}
}

func TestHandleAckOneToOneConversation(t *testing.T) {
Convey("Test handleAck one to one conversation single device", t, FailureHalts, func() {
ctx := context.Background()
env := NewTestEnv(ctx)
defer env.Close()

// Creating contacts, making them friends

So(env.CreateBertyContacts("A", "B"), ShouldBeNil)
So(env.Befriend("A", "B"), ShouldBeNil)

deviceA := env.GetDevice("A")

// Setting up test event and inserting it in A's database

evt := env.ConversationEventFrom("A", "B").
SetConversationNewMessageAttrs(&entity.ConversationNewMessageAttrs{Message: &entity.Message{Text: "hi"}})

evtDispatch := makeEventDispatch(evt, env.GetDevice("B"))

So(deviceA.DBInsert(evt), ShouldBeNil)
So(deviceA.DBInsert(evtDispatch), ShouldBeNil)

So(deviceA.DB.First(&evt, &entity.Event{ID: evt.ID}).Error, ShouldBeNil)
So(evt.AckStatus, ShouldEqual, entity.Event_NotAcked)
So(evt.AckedAt, ShouldBeNil)

err := deviceA.DB.Find(&evtDispatch, evtDispatch).Error
So(err, ShouldBeNil)
So(evtDispatch.AckedAt, ShouldBeNil)

// Handling ack event

ackEvt := env.ConversationEventFrom("B", "A").
SetAckAttrs(&entity.AckAttrs{IDs: []string{evt.ID}})

err = deviceA.Node.handleAck(ctx, ackEvt)
So(err, ShouldBeNil)

// Checking values

err = deviceA.DB.Find(&evtDispatch, evtDispatch).Error
So(err, ShouldBeNil)
So(evtDispatch.AckedAt, ShouldNotBeNil)

So(deviceA.DB.First(&evt, &entity.Event{ID: evt.ID}).Error, ShouldBeNil)
So(evt.AckedAt, ShouldNotBeNil)
So(evt.AckStatus, ShouldEqual, entity.Event_AckedByAllDevices)
So(len(deviceA.Node.clientEvents), ShouldEqual, 1)
})
}

func TestHandleAckGroupConversation(t *testing.T) {
Convey("Test handleAck one to one conversation single device", t, FailureHalts, func() {
ctx := context.Background()
env := NewTestEnv(ctx)
defer env.Close()

// Creating contacts, making them friends

So(env.CreateBertyContacts("A", "B", "C"), ShouldBeNil)
So(env.Befriend("A", "B"), ShouldBeNil)
So(env.Befriend("A", "C"), ShouldBeNil)
_, err := env.CreateGroupConversation("A", "B", "C")
So(err, ShouldBeNil)

deviceA := env.GetDevice("A")
deviceB := env.GetDevice("B")
deviceC := env.GetDevice("C")

// Setting up test event and inserting it in A's database

evt := env.ConversationEventFrom("A", "B", "C").
SetConversationNewMessageAttrs(&entity.ConversationNewMessageAttrs{Message: &entity.Message{Text: "hi"}})
evtDispatchB := makeEventDispatch(evt, env.GetDevice("B"))
evtDispatchC := makeEventDispatch(evt, env.GetDevice("C"))

So(deviceA.DBInsert(evt), ShouldBeNil)
So(deviceA.DBInsert(evtDispatchB), ShouldBeNil)
So(deviceA.DBInsert(evtDispatchC), ShouldBeNil)

So(deviceA.DB.First(&evt, &entity.Event{ID: evt.ID}).Error, ShouldBeNil)
So(evt.AckStatus, ShouldEqual, entity.Event_NotAcked)
So(evt.AckedAt, ShouldBeNil)

err = deviceA.DB.Find(&evtDispatchB, &entity.EventDispatch{EventID: evt.ID, DeviceID: deviceB.Device.ID}).Error
So(err, ShouldBeNil)
So(evtDispatchB.AckedAt, ShouldBeNil)

err = deviceA.DB.Find(&evtDispatchC, &entity.EventDispatch{EventID: evt.ID, DeviceID: deviceC.Device.ID}).Error
So(err, ShouldBeNil)
So(evtDispatchC.AckedAt, ShouldBeNil)

// Handling ack event from B

ackEvt := env.ConversationEventFrom("B", "A", "C").
SetAckAttrs(&entity.AckAttrs{IDs: []string{evt.ID}})

err = deviceA.Node.handleAck(ctx, ackEvt)
So(err, ShouldBeNil)

// Checking values after ack from B

err = deviceA.DB.Find(&evtDispatchB, &entity.EventDispatch{EventID: evt.ID, DeviceID: deviceB.Device.ID}).Error
So(err, ShouldBeNil)
So(evtDispatchB.AckedAt, ShouldNotBeNil)

err = deviceA.DB.Find(&evtDispatchC, &entity.EventDispatch{EventID: evt.ID, DeviceID: deviceC.Device.ID}).Error
So(err, ShouldBeNil)
So(evtDispatchC.AckedAt, ShouldBeNil)

So(deviceA.DB.First(&evt, &entity.Event{ID: evt.ID}).Error, ShouldBeNil)
So(evt.AckedAt, ShouldBeNil)
So(evt.AckStatus, ShouldEqual, entity.Event_AckedAtLeastOnce)
So(len(deviceA.Node.clientEvents), ShouldEqual, 0)

// Handling ack event from C

ackEvt = env.ConversationEventFrom("C", "A", "B").
SetAckAttrs(&entity.AckAttrs{IDs: []string{evt.ID}})

err = deviceA.Node.handleAck(ctx, ackEvt)
So(err, ShouldBeNil)

// Checking values after ack from C

err = deviceA.DB.Find(&evtDispatchB, &entity.EventDispatch{EventID: evt.ID, DeviceID: deviceB.Device.ID}).Error
So(err, ShouldBeNil)
So(evtDispatchB.AckedAt, ShouldNotBeNil)

err = deviceA.DB.Find(&evtDispatchC, &entity.EventDispatch{EventID: evt.ID, DeviceID: deviceC.Device.ID}).Error
So(err, ShouldBeNil)
So(evtDispatchC.AckedAt, ShouldNotBeNil)

So(deviceA.DB.First(&evt, &entity.Event{ID: evt.ID}).Error, ShouldBeNil)
So(evt.AckedAt, ShouldNotBeNil)
So(evt.AckStatus, ShouldEqual, entity.Event_AckedByAllDevices)
So(len(deviceA.Node.clientEvents), ShouldEqual, 1)
})
}
14 changes: 8 additions & 6 deletions core/node/p2pclient.go
Expand Up @@ -21,12 +21,14 @@ func (n *Node) NewEvent(ctx context.Context) *entity.Event {
// ctx = tracer.Context()

return &entity.Event{
ID: n.NewID(),
APIVersion: p2p.Version,
CreatedAt: time.Now().UTC(),
Direction: entity.Event_Outgoing,
Dispatches: make([]*entity.EventDispatch, 0),
SourceDeviceID: n.b64pubkey,
ID: n.NewID(),
APIVersion: p2p.Version,
CreatedAt: time.Now().UTC(),
Direction: entity.Event_Outgoing,
Dispatches: make([]*entity.EventDispatch, 0),
SourceContactID: n.config.MyselfID,
SourceDeviceID: n.b64pubkey,
AckStatus: entity.Event_NotAcked,
}
}

Expand Down

0 comments on commit 8e85e0b

Please sign in to comment.