Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dt datastore decode issue and set latest on publisher startup #381

Merged
merged 1 commit into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions controller/publisher/pando_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/go-legs/dtsync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs/core/bootstrap"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -80,7 +80,7 @@ func NewPandoPublisher(ds datastore.Batching, store storage.ReadableStorage, o .
return p, nil
}

func (p *PandoPublisher) Start(_ context.Context) (err error) {
func (p *PandoPublisher) Start(ctx context.Context) (err error) {
// Dealbot registration as a provider on pando side is done manually.
// TODO: If the registration is idempotent, maybe re-register here automatically?
p.lock.Lock()
Expand All @@ -103,14 +103,37 @@ func (p *PandoPublisher) Start(_ context.Context) (err error) {
p.closer = closer
}

// Namespace the datastore used internally by legs.
lds := namespace.Wrap(p.ds, datastore.NewKey("/legs/dtsync/pub"))
// Use ephemeral storage to avoid UTF-8 encoding issues.
//
// Note that the datastore here is only used by datatrasnfer. The datatransfer manager uses the
// datastore to store byte value of CIDs, i.e. cid.Byte(). Since the backing db table uses type
// text as the data column, and go postgres requires UTF-8 encoding we will get an error when
// the SQL driver attempts to encode bytes, e.g.: pq: invalid byte sequence for encoding "UTF8".
//
// For now use an ephemeral storage for the datatransfer.
//TODO: Understand if we need a persistent datastore for datatransfer at all here.
// If so, wrap a datastore with customized byte encoding so that postgres is happy.
lds := dssync.MutexWrap(datastore.NewMapDatastore())
p.pub, err = dtsync.NewPublisher(p.h, lds, p.ls, pandoTopic)
if err != nil {
log.Errorw("Failed to initialize legs publisher", "err", err)
return err
}

latest, err := p.getLatest(ctx)
if err != nil {
log.Errorw("Failed to get latest while starting pando publisher", "err", err)
return err
}
log = log.With("head", latest)

if latest != cid.Undef {
if err := p.pub.SetRoot(ctx, latest); err != nil {
log.Errorw("Failed to update the head CID while starting pando publisher", "err", err)
return err
}
}

log.Infow("Started pando publisher", "extAddrs", p.opts.extAddrs)
return nil
}
Expand Down
142 changes: 140 additions & 2 deletions controller/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,29 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ipfs/go-cid"
"io"
"io/ioutil"
"os"
"testing"
"time"

"github.com/filecoin-project/dealbot/controller/publisher"
"github.com/filecoin-project/dealbot/controller/state/postgresdb"
"github.com/filecoin-project/dealbot/controller/state/postgresdb/temporary"
"github.com/filecoin-project/dealbot/tasks"
"github.com/filecoin-project/go-legs"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ds-sql/postgres"
"github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage"
"github.com/ipld/go-ipld-prime/storage/memstore"
pando "github.com/kenlabs/pando/pkg/types/schema"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -587,7 +597,6 @@ func TestComplete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
withState(ctx, t, func(state *stateDB) {

err := populateTestTasksFromFile(ctx, jsonTestDeals, state)
require.NoError(t, err)
// dealbot1 takes a task.
Expand Down Expand Up @@ -623,6 +632,135 @@ func TestComplete(t *testing.T) {
})
}

func TestCompleteWithPublish(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
withState(ctx, t, func(state *stateDB) {
err := populateTestTasksFromFile(ctx, jsonTestDeals, state)
require.NoError(t, err)
// dealbot1 takes a task.
task, err := state.AssignTask(ctx, tasks.Type.PopTask.Of("dealbot1", tasks.InProgress))
require.NoError(t, err)
require.Equal(t, *tasks.InProgress, task.Status)

// succeed task.
task, err = state.Update(ctx, task.GetUUID(), tasks.Type.UpdateTask.Of("dealbot1", tasks.Successful, 1))
require.NoError(t, err)
require.Equal(t, *tasks.Successful, task.Status)

require.NoError(t, err)
// drain the dealbot / finalize the task.
require.NoError(t, state.DrainWorker(ctx, "dealbot1"))
uc, err := state.PublishRecordsFrom(ctx, "dealbot1")
require.NoError(t, err)
require.NotEqual(t, cid.Undef, uc)

subhost, err := libp2p.New()
require.NoError(t, err)

pubhost, err := libp2p.New()
require.NoError(t, err)

queries := postgres.NewQueries("legs_data")
ds := NewSqlDatastore(state.dbconn, queries)
pub, err := publisher.NewPandoPublisher(
ds,
state.Store(ctx),
publisher.WithHost(pubhost),
publisher.WithBootstrapPeers(subhost.Peerstore().PeerInfo(subhost.ID())))
require.NoError(t, err)

err = pub.Start(ctx)
require.NoError(t, err)
defer pub.Shutdown(ctx)

err = pub.Publish(ctx, uc)
require.NoError(t, err)

subLS := cidlink.DefaultLinkSystem()
subStore := &memstore.Store{}
subLS.SetReadStorage(subStore)
subLS.SetWriteStorage(subStore)

sub, err := legs.NewSubscriber(subhost, dssync.MutexWrap(datastore.NewMapDatastore()), subLS, "/pando/v0.0.1", nil)
require.NoError(t, err)

// sync latest by specifying cid.Undef to fetch the head on the fly.
got1stLatest, err := sub.Sync(ctx, pubhost.ID(), cid.Undef, nil, pubhost.Addrs()[0])
require.NoError(t, err)

// Assert that synced data is a valid pando metadata with no previous link and get the payload as CID
got1stPayloadCid := requirePandoMetadataPayloadAsCid(t, subStore, got1stLatest, pubhost.ID(), cid.Undef)

// Assert that the decoded metadata payload as CID is retrievable.
got1stSyncCid, err := sub.Sync(ctx, pubhost.ID(), got1stPayloadCid, nil, pubhost.Addrs()[0])
require.NoError(t, err)
require.Equal(t, got1stSyncCid, got1stPayloadCid)

// Make another record and publish it to assert previous ID is set properly in pando metadata.
uc2, err := state.PublishRecordsFrom(ctx, "dealbot1")
require.NoError(t, err)
require.NotEqual(t, cid.Undef, uc2)
require.NotEqual(t, uc, uc2)

err = pub.Publish(ctx, uc2)
require.NoError(t, err)

// sync latest by specifying cid.Undef to fetch the head on the fly and assert it's different from previous latest.
got2ndLatest, err := sub.Sync(ctx, pubhost.ID(), cid.Undef, nil, pubhost.Addrs()[0])
require.NoError(t, err)
require.NotEqual(t, got1stLatest, got2ndLatest)

// Assert the synced data decodes as pando metadata with the expected publisher ID and previous link.
got2ndPayloadCid := requirePandoMetadataPayloadAsCid(t, subStore, got2ndLatest, pubhost.ID(), got1stLatest)

// Explicitly sync the payload to assert it is also retrievable.
got2ndSyncCid, err := sub.Sync(ctx, pubhost.ID(), got2ndPayloadCid, nil, pubhost.Addrs()[0])
require.NoError(t, err)
require.Equal(t, got2ndSyncCid, got2ndPayloadCid)
})
}

func requirePandoMetadataPayloadAsCid(t *testing.T, store storage.ReadableStorage, key cid.Cid, wantProvId peer.ID, wantPrev cid.Cid) cid.Cid {
ctx := context.Background()
gotBytes, err := store.Get(ctx, key.KeyString())
require.NoError(t, err)

// Assert the synced data decodes as a valid pando metadata.
nb := pando.Type.Metadata.NewBuilder()
err = dagjson.Decode(nb, bytes.NewBuffer(gotBytes))
require.NoError(t, err)
gotMetadata := nb.Build().(pando.Metadata)

// Assert that provider ID is as expected
str, err := gotMetadata.FieldProvider().AsString()
require.NoError(t, err)
require.Equal(t, wantProvId.String(), str)

// Assert that the metadata payload is a valid CID.
pp, err := gotMetadata.FieldPayload().AsBytes()
require.NoError(t, err)
_, payloadCid, err := cid.CidFromBytes(pp)
require.NoError(t, err)

// Assert signature validity
sigPeerID, err := pando.VerifyMetadata(gotMetadata)
require.NoError(t, err)
require.Equal(t, wantProvId, sigPeerID)

// Assert expected previous link
hasPrev := gotMetadata.FieldPreviousID().IsAbsent() || gotMetadata.FieldPreviousID().IsNull()
if wantPrev == cid.Undef {
require.True(t, hasPrev)
} else {
require.False(t, hasPrev)
gotPrev, err := gotMetadata.FieldPreviousID().AsNode().AsLink()
require.NoError(t, err)
require.Equal(t, wantPrev, gotPrev.(cidlink.Link).Cid)
}
return payloadCid
}

func TestDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down