Skip to content

Commit

Permalink
Fix dt datastore decode issue and set latest on publisher startup
Browse files Browse the repository at this point in the history
Fix an issue where storage of bytes attempted by the datatransfer manger
fail due to UTf-8 econding at DB level. Use an ephemeral datastore for
datatransfer manager to avoid this altogether. Left a TODO to revisit.

Set the reference to root CID if available on publisher startup.

Add tests to assert publication, retrieval and chaining of metadata in
dealbot.
  • Loading branch information
masih committed Mar 8, 2022
1 parent 4200853 commit 492c3ec
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 6 deletions.
31 changes: 27 additions & 4 deletions controller/publisher/pando_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/go-legs/dtsync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs/core/bootstrap"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -80,7 +80,7 @@ func NewPandoPublisher(ds datastore.Batching, store storage.ReadableStorage, o .
return p, nil
}

func (p *PandoPublisher) Start(_ context.Context) (err error) {
func (p *PandoPublisher) Start(ctx context.Context) (err error) {
// Dealbot registration as a provider on pando side is done manually.
// TODO: If the registration is idempotent, maybe re-register here automatically?
p.lock.Lock()
Expand All @@ -103,14 +103,37 @@ func (p *PandoPublisher) Start(_ context.Context) (err error) {
p.closer = closer
}

// Namespace the datastore used internally by legs.
lds := namespace.Wrap(p.ds, datastore.NewKey("/legs/dtsync/pub"))
// Use ephemeral storage to avoid UTF-8 encoding issues.
//
// Note that the datastore here is only used by datatrasnfer. The datatransfer manager uses the
// datastore to store byte value of CIDs, i.e. cid.Byte(). Since the backing db table uses type
// text as the data column, and go postgres requires UTF-8 encoding we will get an error when
// the SQL driver attempts to encode bytes, e.g.: pq: invalid byte sequence for encoding "UTF8".
//
// For now use an ephemeral storage for the datatransfer.
//TODO: Understand if we need a persistent datastore for datatransfer at all here.
// If so, wrap a datastore with customized byte encoding so that postgres is happy.
lds := dssync.MutexWrap(datastore.NewMapDatastore())
p.pub, err = dtsync.NewPublisher(p.h, lds, p.ls, pandoTopic)
if err != nil {
log.Errorw("Failed to initialize legs publisher", "err", err)
return err
}

latest, err := p.getLatest(ctx)
if err != nil {
log.Errorw("Failed to get latest while starting pando publisher", "err", err)
return err
}
log = log.With("head", latest)

if latest != cid.Undef {
if err := p.pub.SetRoot(ctx, latest); err != nil {
log.Errorw("Failed to update the head CID while starting pando publisher", "err", err)
return err
}
}

log.Infow("Started pando publisher", "extAddrs", p.opts.extAddrs)
return nil
}
Expand Down
142 changes: 140 additions & 2 deletions controller/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,29 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ipfs/go-cid"
"io"
"io/ioutil"
"os"
"testing"
"time"

"github.com/filecoin-project/dealbot/controller/publisher"
"github.com/filecoin-project/dealbot/controller/state/postgresdb"
"github.com/filecoin-project/dealbot/controller/state/postgresdb/temporary"
"github.com/filecoin-project/dealbot/tasks"
"github.com/filecoin-project/go-legs"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ds-sql/postgres"
"github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage"
"github.com/ipld/go-ipld-prime/storage/memstore"
pando "github.com/kenlabs/pando/pkg/types/schema"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -587,7 +597,6 @@ func TestComplete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
withState(ctx, t, func(state *stateDB) {

err := populateTestTasksFromFile(ctx, jsonTestDeals, state)
require.NoError(t, err)
// dealbot1 takes a task.
Expand Down Expand Up @@ -623,6 +632,135 @@ func TestComplete(t *testing.T) {
})
}

func TestCompleteWithPublish(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
withState(ctx, t, func(state *stateDB) {
err := populateTestTasksFromFile(ctx, jsonTestDeals, state)
require.NoError(t, err)
// dealbot1 takes a task.
task, err := state.AssignTask(ctx, tasks.Type.PopTask.Of("dealbot1", tasks.InProgress))
require.NoError(t, err)
require.Equal(t, *tasks.InProgress, task.Status)

// succeed task.
task, err = state.Update(ctx, task.GetUUID(), tasks.Type.UpdateTask.Of("dealbot1", tasks.Successful, 1))
require.NoError(t, err)
require.Equal(t, *tasks.Successful, task.Status)

require.NoError(t, err)
// drain the dealbot / finalize the task.
require.NoError(t, state.DrainWorker(ctx, "dealbot1"))
uc, err := state.PublishRecordsFrom(ctx, "dealbot1")
require.NoError(t, err)
require.NotEqual(t, cid.Undef, uc)

subhost, err := libp2p.New()
require.NoError(t, err)

pubhost, err := libp2p.New()
require.NoError(t, err)

queries := postgres.NewQueries("legs_data")
ds := NewSqlDatastore(state.dbconn, queries)
pub, err := publisher.NewPandoPublisher(
ds,
state.Store(ctx),
publisher.WithHost(pubhost),
publisher.WithBootstrapPeers(subhost.Peerstore().PeerInfo(subhost.ID())))
require.NoError(t, err)

err = pub.Start(ctx)
require.NoError(t, err)
defer pub.Shutdown(ctx)

err = pub.Publish(ctx, uc)
require.NoError(t, err)

subLS := cidlink.DefaultLinkSystem()
subStore := &memstore.Store{}
subLS.SetReadStorage(subStore)
subLS.SetWriteStorage(subStore)

sub, err := legs.NewSubscriber(subhost, dssync.MutexWrap(datastore.NewMapDatastore()), subLS, "/pando/v0.0.1", nil)
require.NoError(t, err)

// sync latest by specifying cid.Undef to fetch the head on the fly.
got1stLatest, err := sub.Sync(ctx, pubhost.ID(), cid.Undef, nil, pubhost.Addrs()[0])
require.NoError(t, err)

// Assert that synced data is a valid pando metadata with no previous link and get the payload as CID
got1stPayloadCid := requirePandoMetadataPayloadAsCid(t, subStore, got1stLatest, pubhost.ID(), cid.Undef)

// Assert that the decoded metadata payload as CID is retrievable.
got1stSyncCid, err := sub.Sync(ctx, pubhost.ID(), got1stPayloadCid, nil, pubhost.Addrs()[0])
require.NoError(t, err)
require.Equal(t, got1stSyncCid, got1stPayloadCid)

// Make another record and publish it to assert previous ID is set properly in pando metadata.
uc2, err := state.PublishRecordsFrom(ctx, "dealbot1")
require.NoError(t, err)
require.NotEqual(t, cid.Undef, uc2)
require.NotEqual(t, uc, uc2)

err = pub.Publish(ctx, uc2)
require.NoError(t, err)

// sync latest by specifying cid.Undef to fetch the head on the fly and assert it's different from previous latest.
got2ndLatest, err := sub.Sync(ctx, pubhost.ID(), cid.Undef, nil, pubhost.Addrs()[0])
require.NoError(t, err)
require.NotEqual(t, got1stLatest, got2ndLatest)

// Assert the synced data decodes as pando metadata with the expected publisher ID and previous link.
got2ndPayloadCid := requirePandoMetadataPayloadAsCid(t, subStore, got2ndLatest, pubhost.ID(), got1stLatest)

// Explicitly sync the payload to assert it is also retrievable.
got2ndSyncCid, err := sub.Sync(ctx, pubhost.ID(), got2ndPayloadCid, nil, pubhost.Addrs()[0])
require.NoError(t, err)
require.Equal(t, got2ndSyncCid, got2ndPayloadCid)
})
}

func requirePandoMetadataPayloadAsCid(t *testing.T, store storage.ReadableStorage, key cid.Cid, wantProvId peer.ID, wantPrev cid.Cid) cid.Cid {
ctx := context.Background()
gotBytes, err := store.Get(ctx, key.KeyString())
require.NoError(t, err)

// Assert the synced data decodes as a valid pando metadata.
nb := pando.Type.Metadata.NewBuilder()
err = dagjson.Decode(nb, bytes.NewBuffer(gotBytes))
require.NoError(t, err)
gotMetadata := nb.Build().(pando.Metadata)

// Assert that provider ID is as expected
str, err := gotMetadata.FieldProvider().AsString()
require.NoError(t, err)
require.Equal(t, wantProvId.String(), str)

// Assert that the metadata payload is a valid CID.
pp, err := gotMetadata.FieldPayload().AsBytes()
require.NoError(t, err)
_, payloadCid, err := cid.CidFromBytes(pp)
require.NoError(t, err)

// Assert signature validity
sigPeerID, err := pando.VerifyMetadata(gotMetadata)
require.NoError(t, err)
require.Equal(t, wantProvId, sigPeerID)

// Assert expected previous link
hasPrev := gotMetadata.FieldPreviousID().IsAbsent() || gotMetadata.FieldPreviousID().IsNull()
if wantPrev == cid.Undef {
require.True(t, hasPrev)
} else {
require.False(t, hasPrev)
gotPrev, err := gotMetadata.FieldPreviousID().AsNode().AsLink()
require.NoError(t, err)
require.Equal(t, wantPrev, gotPrev.(cidlink.Link).Cid)
}
return payloadCid
}

func TestDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 492c3ec

Please sign in to comment.