Skip to content

Commit

Permalink
chore: minor cleanups around byte ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 17, 2023
1 parent 72701c1 commit 8b5abf9
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 65 deletions.
3 changes: 2 additions & 1 deletion pkg/internal/itest/http_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,8 @@ func TestHttpFetch(t *testing.T) {
if testCase.lassieOpts != nil {
customOpts = testCase.lassieOpts(t, mrn)
}
opts := append([]lassie.LassieOption{lassie.WithProviderTimeout(20 * time.Second),
opts := append([]lassie.LassieOption{
lassie.WithProviderTimeout(20 * time.Second),
lassie.WithHost(mrn.Self),
lassie.WithFinder(mrn.Finder),
}, customOpts...)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package testpeer
package linksystemutil

import (
"bytes"
Expand All @@ -13,34 +13,29 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
)

var _ blockstore.Blockstore = (*BackedStore)(nil)
var _ blockstore.Blockstore = (*linkSystemBlockstore)(nil)
var _ blockstore.Blockstore = (*LinkSystemBlockstore)(nil)

type BackedStore struct {
blockstore.Blockstore
}

func (bs *BackedStore) UseLinkSystem(lsys linking.LinkSystem) {
bs.Blockstore = &linkSystemBlockstore{lsys}
type LinkSystemBlockstore struct {
lsys linking.LinkSystem
}

type linkSystemBlockstore struct {
lsys linking.LinkSystem
func NewLinkSystemBlockstore(lsys linking.LinkSystem) *LinkSystemBlockstore {
return &LinkSystemBlockstore{lsys}
}

func (lsbs *linkSystemBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
func (lsbs *LinkSystemBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
return errors.New("not supported")
}

func (lsbs *linkSystemBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
func (lsbs *LinkSystemBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, err := lsbs.lsys.StorageReadOpener(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c})
if err != nil {
return false, err
}
return true, nil
}

func (lsbs *linkSystemBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
func (lsbs *LinkSystemBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
rdr, err := lsbs.lsys.StorageReadOpener(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c})
if err != nil {
return nil, err
Expand All @@ -53,7 +48,7 @@ func (lsbs *linkSystemBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Bl
return blocks.NewBlockWithCid(buf.Bytes(), c)
}

func (lsbs *linkSystemBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
func (lsbs *LinkSystemBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
rdr, err := lsbs.lsys.StorageReadOpener(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c})
if err != nil {
return 0, err
Expand All @@ -65,7 +60,7 @@ func (lsbs *linkSystemBlockstore) GetSize(ctx context.Context, c cid.Cid) (int,
return int(i), nil
}

func (lsbs *linkSystemBlockstore) Put(ctx context.Context, blk blocks.Block) error {
func (lsbs *LinkSystemBlockstore) Put(ctx context.Context, blk blocks.Block) error {
w, wc, err := lsbs.lsys.StorageWriteOpener(linking.LinkContext{Ctx: ctx})
if err != nil {
return err
Expand All @@ -76,7 +71,7 @@ func (lsbs *linkSystemBlockstore) Put(ctx context.Context, blk blocks.Block) err
return wc(cidlink.Link{Cid: blk.Cid()})
}

func (lsbs *linkSystemBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error {
func (lsbs *LinkSystemBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error {
for _, blk := range blks {
if err := lsbs.Put(ctx, blk); err != nil {
return err
Expand All @@ -85,10 +80,10 @@ func (lsbs *linkSystemBlockstore) PutMany(ctx context.Context, blks []blocks.Blo
return nil
}

func (lsbs *linkSystemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (lsbs *LinkSystemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not supported")
}

func (lsbs *linkSystemBlockstore) HashOnRead(enabled bool) {
func (lsbs *LinkSystemBlockstore) HashOnRead(enabled bool) {
lsbs.lsys.TrustedStorage = !enabled
}
12 changes: 6 additions & 6 deletions pkg/internal/itest/mocknet/mocknet.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ func NewMockRetrievalNet(ctx context.Context, t *testing.T) *MockRetrievalNet {
return mrn
}

func (mrn *MockRetrievalNet) AddBitswapPeers(n int) {
mrn.addPeers(mrn.testPeerGenerator.BitswapPeers(n))
func (mrn *MockRetrievalNet) AddBitswapPeers(n int, opts ...testpeer.PeerOption) {
mrn.addPeers(mrn.testPeerGenerator.BitswapPeers(n, opts...))
}

func (mrn *MockRetrievalNet) AddGraphsyncPeers(n int) {
mrn.addPeers(mrn.testPeerGenerator.GraphsyncPeers(n))
func (mrn *MockRetrievalNet) AddGraphsyncPeers(n int, opts ...testpeer.PeerOption) {
mrn.addPeers(mrn.testPeerGenerator.GraphsyncPeers(n, opts...))
}

func (mrn *MockRetrievalNet) AddHttpPeers(n int) {
mrn.addPeers(mrn.testPeerGenerator.HttpPeers(n))
func (mrn *MockRetrievalNet) AddHttpPeers(n int, opts ...testpeer.PeerOption) {
mrn.addPeers(mrn.testPeerGenerator.HttpPeers(n, opts...))
}

func (mrn *MockRetrievalNet) addPeers(peers []testpeer.TestPeer) {
Expand Down
92 changes: 62 additions & 30 deletions pkg/internal/itest/testpeer/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
"github.com/filecoin-project/lassie/pkg/internal/itest/linksystemutil"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -76,60 +77,60 @@ func (g *TestPeerGenerator) Close() error {
}

// NextBitswap generates a new test peer with bitswap + dependencies
func (g *TestPeerGenerator) NextBitswap() TestPeer {
func (g *TestPeerGenerator) NextBitswap(opts ...PeerOption) TestPeer {
g.seq++
p, err := RandTestPeerIdentity()
require.NoError(g.t, err)
tp, err := NewTestBitswapPeer(g.ctx, g.mn, p, g.netOptions, g.bsOptions)
tp, err := NewTestBitswapPeer(g.ctx, g.mn, p, g.netOptions, g.bsOptions, opts...)
require.NoError(g.t, err)
return tp
}

// NextGraphsync generates a new test peer with graphsync + dependencies
func (g *TestPeerGenerator) NextGraphsync() TestPeer {
func (g *TestPeerGenerator) NextGraphsync(opts ...PeerOption) TestPeer {
g.seq++
p, err := p2ptestutil.RandTestBogusIdentity()
p, err := RandTestPeerIdentity()
require.NoError(g.t, err)
tp, err := NewTestGraphsyncPeer(g.ctx, g.mn, p)
tp, err := NewTestGraphsyncPeer(g.ctx, g.mn, p, opts...)
require.NoError(g.t, err)
return tp
}

// NextHttp generates a new test peer with http + dependencies
func (g *TestPeerGenerator) NextHttp() TestPeer {
func (g *TestPeerGenerator) NextHttp(opts ...PeerOption) TestPeer {
g.seq++
p, err := RandTestPeerIdentity()
require.NoError(g.t, err)
tp, err := NewTestHttpPeer(g.ctx, g.mn, p, g.t)
tp, err := NewTestHttpPeer(g.ctx, g.mn, p, g.t, opts...)
require.NoError(g.t, err)
return tp
}

// BitswapPeers creates N test peers with bitswap + dependencies
func (g *TestPeerGenerator) BitswapPeers(n int) []TestPeer {
func (g *TestPeerGenerator) BitswapPeers(n int, opts ...PeerOption) []TestPeer {
var instances []TestPeer
for j := 0; j < n; j++ {
inst := g.NextBitswap()
inst := g.NextBitswap(opts...)
instances = append(instances, inst)
}
return instances
}

// GraphsyncPeers creates N test peers with graphsync + dependencies
func (g *TestPeerGenerator) GraphsyncPeers(n int) []TestPeer {
func (g *TestPeerGenerator) GraphsyncPeers(n int, opts ...PeerOption) []TestPeer {
var instances []TestPeer
for j := 0; j < n; j++ {
inst := g.NextGraphsync()
inst := g.NextGraphsync(opts...)
instances = append(instances, inst)
}
return instances
}

// HttpPeers creates N test peers with http + dependencies
func (g *TestPeerGenerator) HttpPeers(n int) []TestPeer {
func (g *TestPeerGenerator) HttpPeers(n int, opts ...PeerOption) []TestPeer {
var instances []TestPeer
for j := 0; j < n; j++ {
inst := g.NextHttp()
inst := g.NextHttp(opts...)
instances = append(instances, inst)
}
return instances
Expand All @@ -155,7 +156,7 @@ type TestPeer struct {
BitswapNetwork bsnet.BitSwapNetwork
DatatransferServer datatransfer.Manager
HttpServer *TestPeerHttpServer
blockstore *BackedStore
blockstore blockstore.Blockstore
Host host.Host
blockstoreDelay delay.D
LinkSystem *linking.LinkSystem
Expand All @@ -164,7 +165,7 @@ type TestPeer struct {
}

// Blockstore returns the block store for this test instance
func (i *TestPeer) Blockstore() *BackedStore {
func (i *TestPeer) Blockstore() blockstore.Blockstore {
return i.blockstore
}

Expand All @@ -186,8 +187,15 @@ func (i TestPeer) AddrInfo() *peer.AddrInfo {
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewTestBitswapPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, netOptions []bsnet.NetOpt, bsOptions []server.Option) (TestPeer, error) {
peer, _, err := newTestPeer(ctx, mn, p)
func NewTestBitswapPeer(
ctx context.Context,
mn mocknet.Mocknet,
p tnet.Identity,
netOptions []bsnet.NetOpt,
bsOptions []server.Option,
opts ...PeerOption,
) (TestPeer, error) {
peer, _, err := newTestPeer(ctx, mn, p, opts...)
if err != nil {
return TestPeer{}, err
}
Expand All @@ -204,8 +212,8 @@ func NewTestBitswapPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity
return peer, nil
}

func NewTestGraphsyncPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity) (TestPeer, error) {
peer, dstore, err := newTestPeer(ctx, mn, p)
func NewTestGraphsyncPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, opts ...PeerOption) (TestPeer, error) {
peer, dstore, err := newTestPeer(ctx, mn, p, opts...)
if err != nil {
return TestPeer{}, err
}
Expand All @@ -230,8 +238,8 @@ func NewTestGraphsyncPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identi
return peer, nil
}

func NewTestHttpPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, t *testing.T) (TestPeer, error) {
peer, _, err := newTestPeer(ctx, mn, p)
func NewTestHttpPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, t *testing.T, opts ...PeerOption) (TestPeer, error) {
peer, _, err := newTestPeer(ctx, mn, p, opts...)
if err != nil {
return TestPeer{}, err
}
Expand Down Expand Up @@ -268,7 +276,17 @@ func NewTestHttpPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, t
return peer, nil
}

func newTestPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity) (TestPeer, ds.Batching, error) {
func newTestPeer(
ctx context.Context,
mn mocknet.Mocknet,
p tnet.Identity,
opts ...PeerOption,
) (TestPeer, ds.Batching, error) {
cfg := peerConfig{}
for _, opt := range opts {
opt(&cfg)
}

bsdelay := delay.Fixed(0)

client, err := mn.AddPeer(p.PrivateKey(), p.Address())
Expand All @@ -280,18 +298,20 @@ func newTestPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity) (Test
dstore := ds_sync.MutexWrap(baseStore)
dstoreDelayed := delayed.New(dstore, bsdelay)

bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(dstoreDelayed),
blockstore.DefaultCacheOpts())
if err != nil {
return TestPeer{}, nil, err
if cfg.bstore == nil {
var err error
cfg.bstore, err = blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(dstoreDelayed),
blockstore.DefaultCacheOpts())
if err != nil {
return TestPeer{}, nil, err
}
}
backedStore := &BackedStore{bstore}
lsys := storeutil.LinkSystemForBlockstore(backedStore)
lsys := storeutil.LinkSystemForBlockstore(cfg.bstore)
tp := TestPeer{
Host: client,
ID: p.ID(),
blockstore: backedStore,
blockstore: cfg.bstore,
blockstoreDelay: bsdelay,
LinkSystem: &lsys,
Cids: make(map[cid.Cid]struct{}),
Expand Down Expand Up @@ -356,3 +376,15 @@ func RandTestPeerIdentity() (tnet.Identity, error) {
}
return nil, errors.New("failed to find an available port")
}

type peerConfig struct {
bstore blockstore.Blockstore
}

type PeerOption func(*peerConfig)

func WithLinkSystem(lsys linking.LinkSystem) PeerOption {
return func(pc *peerConfig) {
pc.bstore = linksystemutil.NewLinkSystemBlockstore(lsys)
}
}
16 changes: 7 additions & 9 deletions pkg/internal/itest/trustless_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/lassie/pkg/internal/itest/mocknet"
"github.com/filecoin-project/lassie/pkg/internal/itest/testpeer"
"github.com/filecoin-project/lassie/pkg/lassie"
httpserver "github.com/filecoin-project/lassie/pkg/server/http"
"github.com/google/uuid"
Expand Down Expand Up @@ -45,22 +46,19 @@ func TestTrustlessUnixfsFetch(t *testing.T) {

t.Logf("query=%s, blocks=%d", tc.AsQuery(), len(tc.ExpectedCids))

var finishedChan chan []datatransfer.Event
mrn := mocknet.NewMockRetrievalNet(ctx, t)
switch proto {
case "http":
mrn.AddHttpPeers(1)
mrn.AddHttpPeers(1, testpeer.WithLinkSystem(lsys))
case "graphsync":
mrn.AddGraphsyncPeers(1)
case "bitswap":
mrn.AddBitswapPeers(1)
}
require.NoError(t, mrn.MN.LinkAll())
var finishedChan chan []datatransfer.Event
if proto == "graphsync" {
mrn.AddGraphsyncPeers(1, testpeer.WithLinkSystem(lsys))
finishedChan = mocknet.SetupRetrieval(t, mrn.Remotes[0])
case "bitswap":
mrn.AddBitswapPeers(1, testpeer.WithLinkSystem(lsys))
}

mrn.Remotes[0].Blockstore().UseLinkSystem(lsys)
require.NoError(t, mrn.MN.LinkAll())
mrn.Remotes[0].Cids[tc.Root] = struct{}{}

lassie, err := lassie.NewLassie(
Expand Down

0 comments on commit 8b5abf9

Please sign in to comment.