Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PieceStore To Map Encodings #415

Merged
merged 3 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-data-transfer v0.6.6
github.com/filecoin-project/go-ds-versioning v0.0.0-20200925081648-2b6f19ff8a67
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df
github.com/filecoin-project/go-statemachine v0.0.0-20200813232949-df9b130df370
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/specs-actors v0.9.7
Expand All @@ -21,7 +22,7 @@ require (
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.2.1
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
Expand All @@ -44,8 +45,8 @@ require (
github.com/multiformats/go-multibase v0.0.3
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
github.com/stretchr/testify v1.6.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200814224545-656e08ce49ee
golang.org/x/exp v0.0.0-20190121172915-509febef88a4
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd
golang.org/x/net v0.0.0-20200625001655-4c5254603344
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
Expand Down
138 changes: 134 additions & 4 deletions go.sum

Large diffs are not rendered by default.

211 changes: 211 additions & 0 deletions piecestore/impl/piecestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package piecestoreimpl

import (
"context"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
versioned "github.com/filecoin-project/go-ds-versioning/pkg/statestore"

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/piecestore/migrations"
)

var log = logging.Logger("piecestore")

// DSPiecePrefix is the name space for storing piece infos
var DSPiecePrefix = "/pieces"

// DSCIDPrefix is the name space for storing CID infos
var DSCIDPrefix = "/cid-infos"

// NewPieceStore returns a new piecestore based on the given datastore
func NewPieceStore(ds datastore.Batching) (piecestore.PieceStore, error) {
pieceInfoMigrations, err := migrations.PieceInfoMigrations.Build()
if err != nil {
return nil, err
}
pieces, migratePieces := versioned.NewVersionedStateStore(namespace.Wrap(ds, datastore.NewKey(DSPiecePrefix)), pieceInfoMigrations, versioning.VersionKey("1"))
cidInfoMigrations, err := migrations.CIDInfoMigrations.Build()
if err != nil {
return nil, err
}
cidInfos, migrateCidInfos := versioned.NewVersionedStateStore(namespace.Wrap(ds, datastore.NewKey(DSCIDPrefix)), cidInfoMigrations, versioning.VersionKey("1"))
return &pieceStore{
readySub: pubsub.New(readyDispatcher),
pieces: pieces,
migratePieces: migratePieces,
cidInfos: cidInfos,
migrateCidInfos: migrateCidInfos,
}, nil
}

type pieceStore struct {
readySub *pubsub.PubSub
migratePieces func(ctx context.Context) error
pieces versioned.StateStore
migrateCidInfos func(ctx context.Context) error
cidInfos versioned.StateStore
}

func readyDispatcher(_ pubsub.Event, fn pubsub.SubscriberFn) error {
cb, ok := fn.(piecestore.ReadyFunc)
if !ok {
return xerrors.New("wrong type of event")
}
cb()
return nil
}

func (ps *pieceStore) Start(ctx context.Context) error {
go func() {
err := ps.migratePieces(ctx)
if err != nil {
log.Errorf("Migrating pieceInfos: %s", err.Error())
}
err = ps.migrateCidInfos(ctx)
if err != nil {
log.Errorf("Migrating cidInfos: %s", err.Error())
}
err = ps.readySub.Publish(struct{}{})
if err != nil {
log.Warnf("Publish piecestore migration ready event: %s", err.Error())
}
}()
return nil
}

func (ps *pieceStore) OnReady(ready piecestore.ReadyFunc) {
ps.readySub.Subscribe(ready)
}

// Store `dealInfo` in the PieceStore with key `pieceCID`.
func (ps *pieceStore) AddDealForPiece(pieceCID cid.Cid, dealInfo piecestore.DealInfo) error {
return ps.mutatePieceInfo(pieceCID, func(pi *piecestore.PieceInfo) error {
for _, di := range pi.Deals {
if di == dealInfo {
return nil
}
}
pi.Deals = append(pi.Deals, dealInfo)
return nil
})
}

// Store the map of blockLocations in the PieceStore's CIDInfo store, with key `pieceCID`
func (ps *pieceStore) AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error {
for c, blockLocation := range blockLocations {
err := ps.mutateCIDInfo(c, func(ci *piecestore.CIDInfo) error {
for _, pbl := range ci.PieceBlockLocations {
if pbl.PieceCID.Equals(pieceCID) && pbl.BlockLocation == blockLocation {
return nil
}
}
ci.PieceBlockLocations = append(ci.PieceBlockLocations, piecestore.PieceBlockLocation{BlockLocation: blockLocation, PieceCID: pieceCID})
return nil
})
if err != nil {
return err
}
}
return nil
}

func (ps *pieceStore) ListPieceInfoKeys() ([]cid.Cid, error) {
var pis []piecestore.PieceInfo
if err := ps.pieces.List(&pis); err != nil {
return nil, err
}

out := make([]cid.Cid, 0, len(pis))
for _, pi := range pis {
out = append(out, pi.PieceCID)
}

return out, nil
}

func (ps *pieceStore) ListCidInfoKeys() ([]cid.Cid, error) {
var cis []piecestore.CIDInfo
if err := ps.cidInfos.List(&cis); err != nil {
return nil, err
}

out := make([]cid.Cid, 0, len(cis))
for _, ci := range cis {
out = append(out, ci.CID)
}

return out, nil
}

// Retrieve the PieceInfo associated with `pieceCID` from the piece info store.
func (ps *pieceStore) GetPieceInfo(pieceCID cid.Cid) (piecestore.PieceInfo, error) {
var out piecestore.PieceInfo
if err := ps.pieces.Get(pieceCID).Get(&out); err != nil {
return piecestore.PieceInfo{}, err
}
return out, nil
}

// Retrieve the CIDInfo associated with `pieceCID` from the CID info store.
func (ps *pieceStore) GetCIDInfo(payloadCID cid.Cid) (piecestore.CIDInfo, error) {
var out piecestore.CIDInfo
if err := ps.cidInfos.Get(payloadCID).Get(&out); err != nil {
return piecestore.CIDInfo{}, err
}
return out, nil
}

func (ps *pieceStore) ensurePieceInfo(pieceCID cid.Cid) error {
has, err := ps.pieces.Has(pieceCID)

if err != nil {
return err
}
if has {
return nil
}

pieceInfo := piecestore.PieceInfo{PieceCID: pieceCID}
return ps.pieces.Begin(pieceCID, &pieceInfo)
}

func (ps *pieceStore) ensureCIDInfo(c cid.Cid) error {
has, err := ps.cidInfos.Has(c)

if err != nil {
return err
}

if has {
return nil
}

cidInfo := piecestore.CIDInfo{CID: c}
return ps.cidInfos.Begin(c, &cidInfo)
}

func (ps *pieceStore) mutatePieceInfo(pieceCID cid.Cid, mutator interface{}) error {
err := ps.ensurePieceInfo(pieceCID)
if err != nil {
return err
}

return ps.pieces.Get(pieceCID).Mutate(mutator)
}

func (ps *pieceStore) mutateCIDInfo(c cid.Cid, mutator interface{}) error {
err := ps.ensureCIDInfo(c)
if err != nil {
return err
}

return ps.cidInfos.Get(c).Mutate(mutator)
}