Skip to content

Commit

Permalink
fix: replication flow
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Mar 30, 2022
1 parent 3bd3daa commit 4a76ce0
Showing 1 changed file with 131 additions and 51 deletions.
182 changes: 131 additions & 51 deletions go/pkg/bertyreplication/services_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p-core/event"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -424,86 +425,165 @@ func TestReplicationService_Flow(t *testing.T) {
require.NoError(t, err)

t.Log(" --- Register group on replication service ---")

ctx = context.WithValue(ctx, authtypes.ContextTokenHashField, "token1")
ctx = context.WithValue(ctx, authtypes.ContextTokenIssuerField, "issuer1")

// TODO: handle auth
_, err = replPeer.Service.ReplicateGroup(ctx, &replicationtypes.ReplicationServiceReplicateGroup_Request{
Group: groupReplicable,
})
require.NoError(t, err)

{
ctx = context.WithValue(ctx, authtypes.ContextTokenHashField, "token1")
ctx = context.WithValue(ctx, authtypes.ContextTokenIssuerField, "issuer1")

// TODO: handle auth
_, err = replPeer.Service.ReplicateGroup(ctx, &replicationtypes.ReplicationServiceReplicateGroup_Request{
Group: groupReplicable,
})
require.NoError(t, err)
}
t.Log(" --- Registered group on replication service ---")

t.Log(" --- Sending sync messages ---")
{
sub1a, err := g1a.MetadataStore().EventBus().Subscribe(new(protocoltypes.GroupMetadataEvent))
require.NoError(t, err)

_, err = g1a.MetadataStore().SendAppMetadata(ctx, []byte("From 1 - 1"), nil)
require.NoError(t, err)
sub2a, err := g2a.MetadataStore().EventBus().Subscribe(new(protocoltypes.GroupMetadataEvent))
require.NoError(t, err)

_, err = g2a.MetadataStore().SendAppMetadata(ctx, []byte("From 2 - 1"), nil)
require.NoError(t, err)
_, err = g1a.MetadataStore().SendAppMetadata(ctx, []byte("From 1 - 1"), nil)
require.NoError(t, err)

t.Log(" --- Sent sync messages ---")
_, err = g2a.MetadataStore().SendAppMetadata(ctx, []byte("From 2 - 1"), nil)
require.NoError(t, err)

time.Sleep(time.Millisecond * 1000)
var evt interface{}
subs := []event.Subscription{sub1a, sub2a}
for _, sub := range subs {
for i := 0; i < 2; {
select {
case <-time.After(time.Second * 2):
require.FailNow(t, "timeout while waiting for message")
case evt = <-sub.Out():
}

if evt.(protocoltypes.GroupMetadataEvent).Metadata.EventType == protocoltypes.EventTypeGroupMetadataPayloadSent {
i++
}
}
sub.Close()
}

evts1, err := g1a.MetadataStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
ops1 := testutil.TestFilterAppMetadata(t, evts1)
require.NoError(t, err)
// wait for event to be fully replicated
// @FIXME(gfanton) this should not happen, edit metadatastore to emit when
// event has been replicated only ?
time.Sleep(time.Second)

evts2, err := g2a.MetadataStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
ops2 := testutil.TestFilterAppMetadata(t, evts2)
require.NoError(t, err)
evts1, err := g1a.MetadataStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
ops1 := testutil.TestFilterAppMetadata(t, evts1)
require.NoError(t, err)

assert.Equal(t, 2, len(ops1))
assert.Equal(t, 2, len(ops2))
evts2, err := g2a.MetadataStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
ops2 := testutil.TestFilterAppMetadata(t, evts2)
require.NoError(t, err)

odb2.Close()
cleanupAPI2()
assert.Equal(t, 2, len(ops1))
assert.Equal(t, 2, len(ops2))

odb2.Close()
cleanupAPI2()

}
t.Log(" --- Sent sync messages ---")
t.Log(" --- Closed peer 2 ---")

const messageAmount = 50

t.Log(" --- Sending async messages ---")
{

for i := 0; i < 50; i++ {
_, err = g1a.MetadataStore().SendAppMetadata(ctx, []byte(fmt.Sprintf("From 1 - 2: %d", i)), nil)
sub2a, err := g1a.MetadataStore().EventBus().Subscribe(new(protocoltypes.GroupMetadataEvent))
require.NoError(t, err)
}

time.Sleep(time.Millisecond * 500)
cerr := make(chan error)
go func() {
var evt interface{}
defer close(cerr)
defer sub2a.Close()

for i := 0; i < messageAmount; {
select {
case <-time.After(time.Second * 2):
cerr <- fmt.Errorf("timeout while waiting for event")
return
case evt = <-sub2a.Out():
}

if evt.(protocoltypes.GroupMetadataEvent).Metadata.EventType == protocoltypes.EventTypeGroupMetadataPayloadSent {
i++
}
}
}()

for i := 0; i < messageAmount; i++ {
_, err = g1a.MetadataStore().SendAppMetadata(ctx, []byte(fmt.Sprintf("From 1 - 2: %d", i)), nil)
require.NoError(t, err)
}

err = <-cerr
require.NoError(t, err)

odb1.Close()
cleanupAPI1()
}
t.Log(" --- Sent async messages, should be replicated on service ---")
t.Log(" --- Closed peer 1 ---")

odb1.Close()
cleanupAPI1()
t.Log(" --- Opening peer 2, and its db ---")
{
api2, cleanupAPI2 = ipfsutil.TestingCoreAPIUsingMockNet(ctx, t, ipfsOpts2)
defer cleanupAPI2()

t.Log(" --- Closed peer 1 ---")
odb2 = bertyprotocol.NewTestOrbitDB(ctx, t, logger, api2, ipfsOpts2.Datastore)
defer odb2.Close()

api2, cleanupAPI2 = ipfsutil.TestingCoreAPIUsingMockNet(ctx, t, ipfsOpts2)
defer cleanupAPI2()
g2a, err = odb2.OpenGroup(ctx, gA, nil)
require.NoError(t, err)

t.Log(" --- Opening peer 2, and its db ---")
sub2a, err := g2a.MetadataStore().EventBus().Subscribe(new(protocoltypes.GroupMetadataEvent))
require.NoError(t, err)

odb2 = bertyprotocol.NewTestOrbitDB(ctx, t, logger, api2, ipfsOpts2.Datastore)
defer odb2.Close()
cerr := make(chan error)
go func() {
var evt interface{}

err = mn.LinkAll()
require.NoError(t, err)
defer close(cerr)
defer sub2a.Close()

g2a, err = odb2.OpenGroup(ctx, gA, nil)
require.NoError(t, err)
for i := 0; i < messageAmount; {
select {
case <-time.After(time.Second * 2):
cerr <- fmt.Errorf("timeout while waiting for event")
return
case evt = <-sub2a.Out():
}

time.Sleep(2000 * time.Millisecond)
if evt.(protocoltypes.GroupMetadataEvent).Metadata.EventType == protocoltypes.EventTypeGroupMetadataPayloadSent {
i++
}
}
}()

t.Log(" --- Waited for peer 2 to replicate data ---")
err = mn.LinkAll()
require.NoError(t, err)

evts2, err = g2a.MetadataStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
ops2 = testutil.TestFilterAppMetadata(t, evts2)
require.NoError(t, err)
err = <-cerr
require.NoError(t, err)

t.Log(" --- Waited for peer 2 to replicate data ---")

// assert.Equal(t, 3, len(ops2))
evts2, err := g2a.MetadataStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
ops2 := testutil.TestFilterAppMetadata(t, evts2)
require.NoError(t, err)
assert.Equal(t, messageAmount+2, len(ops2)) // ammount of message + 2 sync
}
}

func TestReplicationService_InvalidFlow(t *testing.T) {
Expand Down

0 comments on commit 4a76ce0

Please sign in to comment.