Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphsync v2.0: bindnode conversions for vouchers #305

Closed
wants to merge 11 commits into from
64 changes: 36 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,29 @@ Install the module in your package or app with `go get "github.com/filecoin-proj

[datatransfer.Voucher](https://github.com/filecoin-project/go-data-transfer/blob/21dd66ba370176224114b13030ee68cb785fadb2/datatransfer/types.go#L17)
and [datatransfer.Validator](https://github.com/filecoin-project/go-data-transfer/blob/21dd66ba370176224114b13030ee68cb785fadb2/datatransfer/types.go#L153)
are the interfaces used for validation of graphsync datatransfer messages. Voucher types plus a Validator for them must be registered
with the peer to whom requests will be sent.
are the types used for validation of graphsync datatransfer messages. **Both of these are simply [ipld.Node](https://pkg.go.dev/github.com/ipld/go-ipld-prime#Node)s**.

#### Example Toy Voucher and Validator
```go
const myVoucherType = datatransfer.TypeIdentifier("myVoucher")

type myVoucher struct {
data string
}

func (v *myVoucher) ToBytes() ([]byte, error) {
return []byte(v.data), nil
}

func (v *myVoucher) FromBytes(data []byte) error {
v.data = string(data)
return nil
func fromNode(node ipld.Node) (myVoucher, error) {
if node.Kind() != datamodel.Kind_String {
return nil, fmt.Errorf("invalid node kind")
}
str, err := node.AsString()
if err != nil {
return nil, err
}
return myVoucher{data: str}, nil
}

func (v *myVoucher) Type() string {
return "FakeDTType"
func (m myVoucher) toNode() ipld.Node {
return basicnode.NewString(m.data)
}

type myValidator struct {
Expand All @@ -83,32 +86,39 @@ type myValidator struct {

func (vl *myValidator) ValidatePush(
sender peer.ID,
voucherType datatransfer.TypeIdentifier,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {

v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpush" {
return errors.New("invalid")
}

v, err := fromNode(voucher)
if err != nil {
return err
}
if v.data == "" || v.data != "validpush" {
return errors.New("invalid")
}

return nil
}

func (vl *myValidator) ValidatePull(
receiver peer.ID,
voucherType datatransfer.TypeIdentifier,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {

v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpull" {
return errors.New("invalid")
}
v, err := fromNode(voucher)
if err != nil {
return err
}
if v.data == "" || v.data != "validpull" {
return errors.New("invalid")
}

return nil
}

```


Expand All @@ -118,29 +128,27 @@ for more detail.


### Register a validator
Before sending push or pull requests, you must register a `datatransfer.Voucher`
by its `reflect.Type` and `dataTransfer.RequestValidator` for vouchers that
Before sending push or pull requests, you must register a voucher type by its `datatransfer.TypeIdentifier` and `dataTransfer.RequestValidator` for vouchers that
must be sent with the request. Using the trivial examples above:
```go
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) {
tp := gstransport.NewTransport(h.ID(), gs)
dt := impl.NewDataTransfer(h, tp)

vouch := &myVoucher{}
mv := &myValidator{}
dt.RegisterVoucherType(reflect.TypeOf(vouch), mv)
dt.RegisterVoucherType(myVoucherType, mv)
}
```

For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).

### Open a Push or Pull Request
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
For a push or pull request, provide a context, a voucher type `datatransfer.TypeIdentifier`, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
calls return a `datatransfer.ChannelID` and any error:
```go
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucherType, voucher, baseCid, selector)
// OR
channelID, err := dtm.OpenPushDataChannel(ctx, recipient, voucher, baseCid, selector)
channelID, err := dtm.OpenPushDataChannel(ctx, recipient, voucherType, voucher, baseCid, selector)

```

Expand Down
7 changes: 1 addition & 6 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import (
"github.com/filecoin-project/go-data-transfer/testutil"
)

const stdBlockSize = 8000

type runStats struct {
Time time.Duration
Name string
Expand Down Expand Up @@ -105,7 +103,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
timer := time.NewTimer(30 * time.Second)
start := time.Now()
for j := 0; j < numfiles; j++ {
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewFakeDTType(), allCids[j], allSelector)
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.FakeDTVoucherType, testutil.NewFakeDTTypeNode(), allCids[j], allSelector)
if err != nil {
b.Fatalf("received error on request: %s", err.Error())
}
Expand Down Expand Up @@ -133,9 +131,6 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,

type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid

const defaultUnixfsChunkSize uint64 = 1 << 10
const defaultUnixfsLinksPerLevel = 1024

func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) cid.Cid {

data := make([]byte, size)
Expand Down
6 changes: 2 additions & 4 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/delayed"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -153,7 +152,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
return Instance{}, err
}
} else {
dstore = ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
dstore = ds_sync.MutexWrap(delayed.New(datastore.NewMapDatastore(), bsdelay))
}
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(namespace.Wrap(dstore, datastore.NewKey("blockstore"))),
Expand Down Expand Up @@ -188,8 +187,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
sv := testutil.NewStubbedValidator()
sv.StubSuccessPull()
sv.StubSuccessPush()
dt.RegisterVoucherType(testutil.NewFakeDTType(), sv)
dt.RegisterVoucherResultType(testutil.NewFakeDTType())
dt.RegisterVoucherType(testutil.FakeDTVoucherType, sv)
return Instance{
Adapter: dtNet,
Peer: p,
Expand Down
11 changes: 0 additions & 11 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,6 @@ func (m *Monitor) Shutdown() {
m.stop()
}

// onShutdown shuts down all monitored channels. It is called when the run
// loop exits.
func (m *Monitor) onShutdown() {
m.lk.RLock()
defer m.lk.RUnlock()

for _, ch := range m.channels {
ch.Shutdown()
}
}

// onMonitoredChannelShutdown is called when a monitored channel shuts down
func (m *Monitor) onMonitoredChannelShutdown(chid datatransfer.ChannelID) {
m.lk.Lock()
Expand Down
8 changes: 8 additions & 0 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ func (m *mockChannelState) Voucher() datatransfer.Voucher {
panic("implement me")
}

func (m *mockChannelState) VoucherType() datatransfer.TypeIdentifier {
panic("implement me")
}

func (m *mockChannelState) Sender() peer.ID {
panic("implement me")
}
Expand Down Expand Up @@ -598,6 +602,10 @@ func (m *mockChannelState) VoucherResults() []datatransfer.VoucherResult {
panic("implement me")
}

func (m *mockChannelState) LastVoucherType() datatransfer.TypeIdentifier {
panic("implement me")
}

func (m *mockChannelState) LastVoucher() datatransfer.Voucher {
panic("implement me")
}
Expand Down
96 changes: 50 additions & 46 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels/internal"
"github.com/filecoin-project/go-data-transfer/ipldutil"
)

// channelState is immutable channel data plus mutable state
Expand Down Expand Up @@ -54,9 +53,7 @@ type channelState struct {
// additional vouchers
vouchers []internal.EncodedVoucher
// additional voucherResults
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
voucherResults []internal.EncodedVoucherResult

// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
Expand Down Expand Up @@ -87,23 +84,28 @@ func (c channelState) BaseCID() cid.Cid { return c.baseCid }
// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() ipld.Node {
builder := basicnode.Prototype.Any.NewBuilder()
reader := bytes.NewReader(c.selector.Raw)
err := dagcbor.Decode(builder, reader)
node, err := ipldutil.NodeFromDagCbor(bytes.NewReader(c.selector.Raw))
if err != nil {
log.Error(err)
}
return builder.Build()
return node
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() datatransfer.Voucher {
if len(c.vouchers) == 0 {
return nil
}
decoder, _ := c.voucherDecoder(c.vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[0].Voucher.Raw)
return encodable.(datatransfer.Voucher)
voucher, _ := ipldutil.NodeFromDagCbor(bytes.NewReader(c.vouchers[0].Voucher.Raw))
return voucher
}

// VoucherType returns the type of voucher for this data transfer
func (c channelState) VoucherType() datatransfer.TypeIdentifier {
if len(c.vouchers) == 0 {
return ""
}
return c.vouchers[0].Type
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
Expand Down Expand Up @@ -152,31 +154,35 @@ func (c channelState) Message() string {
func (c channelState) Vouchers() []datatransfer.Voucher {
vouchers := make([]datatransfer.Voucher, 0, len(c.vouchers))
for _, encoded := range c.vouchers {
decoder, _ := c.voucherDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
voucher, _ := ipldutil.NodeFromDagCbor(bytes.NewReader(encoded.Voucher.Raw))
vouchers = append(vouchers, voucher)
}
return vouchers
}

// LastVoucherType returns the type of the last voucher for this data transfer
func (c channelState) LastVoucherType() datatransfer.TypeIdentifier {
if len(c.vouchers) == 0 {
return ""
}
return c.vouchers[len(c.vouchers)-1].Type
}

func (c channelState) LastVoucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.vouchers[len(c.vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[len(c.vouchers)-1].Voucher.Raw)
return encodable.(datatransfer.Voucher)
voucher, _ := ipldutil.NodeFromDagCbor(bytes.NewReader(c.vouchers[len(c.vouchers)-1].Voucher.Raw))
return voucher
}

func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
decoder, _ := c.voucherResultDecoder(c.voucherResults[len(c.voucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.voucherResults[len(c.voucherResults)-1].VoucherResult.Raw)
return encodable.(datatransfer.VoucherResult)
voucher, _ := ipldutil.NodeFromDagCbor(bytes.NewReader(c.voucherResults[len(c.voucherResults)-1].VoucherResult.Raw))
return voucher
}

func (c channelState) VoucherResults() []datatransfer.VoucherResult {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.voucherResults))
for _, encoded := range c.voucherResults {
decoder, _ := c.voucherResultDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
voucherResult, _ := ipldutil.NodeFromDagCbor(bytes.NewReader(encoded.VoucherResult.Raw))
voucherResults = append(voucherResults, voucherResult)
}
return voucherResults
}
Expand Down Expand Up @@ -207,29 +213,27 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
func fromInternalChannelState(c internal.ChannelState) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
transferID: c.TransferID,
baseCid: c.BaseCid,
selector: c.Selector,
sender: c.Sender,
recipient: c.Recipient,
totalSize: c.TotalSize,
status: c.Status,
queued: c.Queued,
sent: c.Sent,
received: c.Received,
receivedBlocksTotal: c.ReceivedBlocksTotal,
queuedBlocksTotal: c.QueuedBlocksTotal,
sentBlocksTotal: c.SentBlocksTotal,
message: c.Message,
vouchers: c.Vouchers,
voucherResults: c.VoucherResults,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
stages: c.Stages,
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
transferID: c.TransferID,
baseCid: c.BaseCid,
selector: c.Selector,
sender: c.Sender,
recipient: c.Recipient,
totalSize: c.TotalSize,
status: c.Status,
queued: c.Queued,
sent: c.Sent,
received: c.Received,
receivedBlocksTotal: c.ReceivedBlocksTotal,
queuedBlocksTotal: c.QueuedBlocksTotal,
sentBlocksTotal: c.SentBlocksTotal,
message: c.Message,
vouchers: c.Vouchers,
voucherResults: c.VoucherResults,
stages: c.Stages,
}
}

Expand Down
Loading