Skip to content

Commit

Permalink
Properly Implement Retrieval Lookups Based on CIDs (#57)
Browse files Browse the repository at this point in the history
* feat(piecestore): maintain CID map

seperate block infos to their own table -- mapping links with pieceCIDs

* feat(retrievalmarket): translate payload CIDs

modify markets to use actual payload CIDs instead of treating payload and piece CIDs as equavalent

* feat(storagemarket): save cid info for piece

very simple implementation of storing a map between payload root and pieceCID. to be augmented later
with actual block refs

* fix(deps): update to rebased go-car

* fix(deps): go mod tidy

* fix(discovery): fix key encoding

* fix(retrievalmarket): set default provider params

* style(piecestore): remove outdated comment
  • Loading branch information
hannahhoward committed Jan 25, 2020
1 parent 1aa65e2 commit 2249baf
Show file tree
Hide file tree
Showing 16 changed files with 771 additions and 284 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896
github.com/ipfs/go-cid v0.0.4
github.com/ipfs/go-datastore v0.1.1
github.com/ipfs/go-graphsync v0.0.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJ
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 h1:Nq8xEW+2KZq7IkRlkOh0rTEUI8FgunhMoLj5EMkJzbQ=
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 h1:l8gnU1VBhftugMKzfh+n7nuDhOw3X1iqfrA33GVBMMY=
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down
110 changes: 74 additions & 36 deletions piecestore/piecestore.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,125 @@
package piecestore

import (
"bytes"
"fmt"

"github.com/filecoin-project/go-statestore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
)

var DSPrefix = "/storagemarket/pieces"
// DSPiecePrefix is the name space for storing piece infos
var DSPiecePrefix = "/storagemarket/pieces"

// DSCIDPrefix is the name space for storing CID infos
var DSCIDPrefix = "/storagemarket/cid-infos"

// NewPieceStore returns a new piecestore based on the given datastore
func NewPieceStore(ds datastore.Batching) PieceStore {
return &pieceStore{
store: statestore.New(namespace.Wrap(ds, datastore.NewKey(DSPrefix))),
pieces: statestore.New(namespace.Wrap(ds, datastore.NewKey(DSPiecePrefix))),
cidInfos: statestore.New(namespace.Wrap(ds, datastore.NewKey(DSCIDPrefix))),
}
}

type pieceStore struct {
store *statestore.StateStore
pieces *statestore.StateStore
cidInfos *statestore.StateStore
}

func (ps *pieceStore) AddDealForPiece(pieceCID []byte, dealInfo DealInfo) error {
// Do we need to de-dupe or anything here?
return ps.mutatePieceInfo(pieceCID, func(pi *PieceInfo) error {
for _, di := range pi.Deals {
if di == dealInfo {
return nil
}
}
pi.Deals = append(pi.Deals, dealInfo)
return nil
})
}

func (ps *pieceStore) AddBlockInfosToPiece(pieceCID []byte, blockInfos []BlockInfo) error {
// Do we need to de-dupe or anything here?
return ps.mutatePieceInfo(pieceCID, func(pi *PieceInfo) error {
pi.Blocks = blockInfos
return nil
})
func (ps *pieceStore) AddPieceBlockLocations(pieceCID []byte, blockLocations map[cid.Cid]BlockLocation) error {
for c, blockLocation := range blockLocations {
err := ps.mutateCIDInfo(c, func(ci *CIDInfo) error {
for _, pbl := range ci.PieceBlockLocations {
if bytes.Equal(pbl.PieceCID, pieceCID) && pbl.BlockLocation == blockLocation {
return nil
}
}
ci.PieceBlockLocations = append(ci.PieceBlockLocations, PieceBlockLocation{blockLocation, pieceCID})
return nil
})
if err != nil {
return err
}
}
return nil
}

func (ps *pieceStore) HasBlockInfo(pieceCID []byte) (bool, error) {
pi, err := ps.GetPieceInfo(pieceCID)
if err != nil {
return false, err
func (ps *pieceStore) GetPieceInfo(pieceCID []byte) (PieceInfo, error) {
var out PieceInfo
if err := ps.pieces.Get(newKey(pieceCID)).Get(&out); err != nil {
return PieceInfo{}, err
}

return len(pi.Blocks) > 0, err
return out, nil
}

func (ps *pieceStore) HasDealInfo(pieceCID []byte) (bool, error) {
pi, err := ps.GetPieceInfo(pieceCID)
if err != nil {
return false, err
func (ps *pieceStore) GetCIDInfo(payloadCID cid.Cid) (CIDInfo, error) {
var out CIDInfo
if err := ps.cidInfos.Get(payloadCID).Get(&out); err != nil {
return CIDInfo{}, err
}

return len(pi.Deals) > 0, nil
return out, nil
}

func (ps *pieceStore) GetPieceInfo(pieceCID []byte) (PieceInfo, error) {
var out PieceInfo
if err := ps.store.Get(newKey(pieceCID)).Get(&out); err != nil {
return PieceInfo{}, err
func (ps *pieceStore) ensurePieceInfo(pieceCID []byte) error {
has, err := ps.pieces.Has(newKey(pieceCID))

if err != nil {
return err
}
return out, nil
if has {
return nil
}

pieceInfo := PieceInfo{PieceCID: pieceCID}
return ps.pieces.Begin(newKey(pieceCID), &pieceInfo)
}

func (ps *pieceStore) ensurePieceInfo(pieceCID []byte) (PieceInfo, error) {
pieceInfo, err := ps.GetPieceInfo(pieceCID)
func (ps *pieceStore) ensureCIDInfo(c cid.Cid) error {
has, err := ps.cidInfos.Has(c)

if err == nil {
return pieceInfo, nil
if err != nil {
return err
}

pieceInfo = PieceInfo{PieceCID: pieceCID}
err = ps.store.Begin(newKey(pieceCID), &pieceInfo)
if has {
return nil
}

return pieceInfo, err
cidInfo := CIDInfo{CID: c}
return ps.cidInfos.Begin(c, &cidInfo)
}

func (ps *pieceStore) mutatePieceInfo(pieceCID []byte, mutator interface{}) error {
_, err := ps.ensurePieceInfo(pieceCID)
err := ps.ensurePieceInfo(pieceCID)
if err != nil {
return err
}

return ps.pieces.Get(newKey(pieceCID)).Mutate(mutator)
}

func (ps *pieceStore) mutateCIDInfo(c cid.Cid, mutator interface{}) error {
err := ps.ensureCIDInfo(c)
if err != nil {
return err
}

return ps.store.Get(newKey(pieceCID)).Mutate(mutator)
return ps.cidInfos.Get(c).Mutate(mutator)
}

func newKey(pieceCID []byte) fmt.Stringer {
Expand Down
195 changes: 154 additions & 41 deletions piecestore/piecestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,164 @@ import (
"github.com/stretchr/testify/assert"

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
)

func TestStorePieceInfo(t *testing.T) {
ps := piecestore.NewPieceStore(datastore.NewMapDatastore())

pieceCid := []byte{1, 2, 3, 4}
initializePieceStore := func(t *testing.T) piecestore.PieceStore {
ps := piecestore.NewPieceStore(datastore.NewMapDatastore())
_, err := ps.GetPieceInfo(pieceCid)
assert.Error(t, err)
return ps
}

// Add a deal info
t.Run("can add deals", func(t *testing.T) {
ps := initializePieceStore(t)
dealInfo := piecestore.DealInfo{
DealID: rand.Uint64(),
SectorID: rand.Uint64(),
Offset: rand.Uint64(),
Length: rand.Uint64(),
}
err := ps.AddDealForPiece(pieceCid, dealInfo)
assert.NoError(t, err)

pi, err := ps.GetPieceInfo(pieceCid)
assert.NoError(t, err)
assert.Len(t, pi.Deals, 1)
assert.Equal(t, pi.Deals[0], dealInfo)
})

t.Run("adding same deal twice does not dup", func(t *testing.T) {
ps := initializePieceStore(t)
dealInfo := piecestore.DealInfo{
DealID: rand.Uint64(),
SectorID: rand.Uint64(),
Offset: rand.Uint64(),
Length: rand.Uint64(),
}
err := ps.AddDealForPiece(pieceCid, dealInfo)
assert.NoError(t, err)

pi, err := ps.GetPieceInfo(pieceCid)
assert.NoError(t, err)
assert.Len(t, pi.Deals, 1)
assert.Equal(t, pi.Deals[0], dealInfo)

err = ps.AddDealForPiece(pieceCid, dealInfo)
assert.NoError(t, err)

pi, err = ps.GetPieceInfo(pieceCid)
assert.NoError(t, err)
assert.Len(t, pi.Deals, 1)
assert.Equal(t, pi.Deals[0], dealInfo)
})
}

func TestStoreCIDInfo(t *testing.T) {

pieceCid1 := []byte{1, 2, 3, 4}
pieceCid2 := []byte{5, 6, 7, 8}
testCIDs := shared_testutil.GenerateCids(3)
blockLocations := make([]piecestore.BlockLocation, 0, 3)
for i := 0; i < 3; i++ {
blockLocations = append(blockLocations, piecestore.BlockLocation{
RelOffset: rand.Uint64(),
BlockSize: rand.Uint64(),
})
}

_, err := ps.GetPieceInfo(pieceCid)
assert.Error(t, err)

// Add a PieceInfo and some state
testCid, err := cid.Decode("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm")
assert.NoError(t, err)
blockInfos := []piecestore.BlockInfo{{testCid, 42, 43}}

err = ps.AddBlockInfosToPiece(pieceCid, blockInfos)
assert.NoError(t, err)
has, err := ps.HasBlockInfo(pieceCid)
assert.True(t, has)
assert.NoError(t, err)
has, err = ps.HasDealInfo(pieceCid)
assert.False(t, has)
assert.NoError(t, err)

pi, err := ps.GetPieceInfo(pieceCid)
assert.NoError(t, err)
assert.Len(t, pi.Blocks, 1)
assert.Equal(t, pi.Blocks[0], piecestore.BlockInfo{testCid, 42, 43})

dealInfo := piecestore.DealInfo{
DealID: rand.Uint64(),
SectorID: rand.Uint64(),
Offset: rand.Uint64(),
Length: rand.Uint64(),
initializePieceStore := func(t *testing.T) piecestore.PieceStore {
ps := piecestore.NewPieceStore(datastore.NewMapDatastore())
_, err := ps.GetCIDInfo(testCIDs[0])
assert.Error(t, err)
return ps
}
err = ps.AddDealForPiece(pieceCid, dealInfo)
assert.NoError(t, err)

has, err = ps.HasBlockInfo(pieceCid)
assert.True(t, has)
assert.NoError(t, err)
has, err = ps.HasDealInfo(pieceCid)
assert.True(t, has)
assert.NoError(t, err)
pi, err = ps.GetPieceInfo(pieceCid)
assert.NoError(t, err)
assert.Len(t, pi.Deals, 1)
assert.Equal(t, pi.Deals[0], dealInfo)

t.Run("can add piece block locations", func(t *testing.T) {
ps := initializePieceStore(t)
err := ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{
testCIDs[0]: blockLocations[0],
testCIDs[1]: blockLocations[1],
testCIDs[2]: blockLocations[2],
})
assert.NoError(t, err)

ci, err := ps.GetCIDInfo(testCIDs[0])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[0], pieceCid1})

ci, err = ps.GetCIDInfo(testCIDs[1])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[1], pieceCid1})

ci, err = ps.GetCIDInfo(testCIDs[2])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid1})
})

t.Run("overlapping adds", func(t *testing.T) {
ps := initializePieceStore(t)
err := ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{
testCIDs[0]: blockLocations[0],
testCIDs[1]: blockLocations[2],
})
assert.NoError(t, err)
err = ps.AddPieceBlockLocations(pieceCid2, map[cid.Cid]piecestore.BlockLocation{
testCIDs[1]: blockLocations[1],
testCIDs[2]: blockLocations[2],
})
assert.NoError(t, err)

ci, err := ps.GetCIDInfo(testCIDs[0])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[0], pieceCid1})

ci, err = ps.GetCIDInfo(testCIDs[1])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 2)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid1})
assert.Equal(t, ci.PieceBlockLocations[1], piecestore.PieceBlockLocation{blockLocations[1], pieceCid2})

ci, err = ps.GetCIDInfo(testCIDs[2])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid2})
})

t.Run("duplicate adds", func(t *testing.T) {
ps := initializePieceStore(t)
err := ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{
testCIDs[0]: blockLocations[0],
testCIDs[1]: blockLocations[1],
})
assert.NoError(t, err)
err = ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{
testCIDs[1]: blockLocations[1],
testCIDs[2]: blockLocations[2],
})
assert.NoError(t, err)

ci, err := ps.GetCIDInfo(testCIDs[0])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[0], pieceCid1})

ci, err = ps.GetCIDInfo(testCIDs[1])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[1], pieceCid1})

ci, err = ps.GetCIDInfo(testCIDs[2])
assert.NoError(t, err)
assert.Len(t, ci.PieceBlockLocations, 1)
assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid1})
})
}
Loading

0 comments on commit 2249baf

Please sign in to comment.