Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom filters for retrieval deals #4424

Merged
merged 2 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 41 additions & 19 deletions markets/dealfilter/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,57 @@ import (
"encoding/json"
"os/exec"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"

"github.com/filecoin-project/lotus/node/modules/dtypes"
)

func CliDealFilter(cmd string) dtypes.DealFilter {
// TODO: run some checks on the cmd string

func CliStorageDealFilter(cmd string) dtypes.StorageDealFilter {
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ")
if err != nil {
return false, "", err
d := struct {
storagemarket.MinerDeal
DealType string
}{
MinerDeal: deal,
DealType: "storage",
}
return runDealFilter(ctx, cmd, d)
}
}

var out bytes.Buffer
func CliRetrievalDealFilter(cmd string) dtypes.RetrievalDealFilter {
return func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) {
d := struct {
retrievalmarket.ProviderDealState
DealType string
}{
ProviderDealState: deal,
DealType: "retrieval",
}
return runDealFilter(ctx, cmd, d)
}
}

func runDealFilter(ctx context.Context, cmd string, deal interface{}) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ")
if err != nil {
return false, "", err
}

c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &out
var out bytes.Buffer

switch err := c.Run().(type) {
case nil:
return true, "", nil
case *exec.ExitError:
return false, out.String(), nil
default:
return false, "filter cmd run error", err
}
c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &out

switch err := c.Run().(type) {
case nil:
return true, "", nil
case *exec.ExitError:
return false, out.String(), nil
default:
return false, "filter cmd run error", err
}
}
6 changes: 4 additions & 2 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ func Online() Option {
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Expand Down Expand Up @@ -484,7 +485,8 @@ func ConfigStorageMiner(c interface{}) Option {
ConfigCommon(&cfg.Common),

If(cfg.Dealmaking.Filter != "",
Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(dealfilter.CliStorageDealFilter(cfg.Dealmaking.Filter))),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.Filter))),
ingar marked this conversation as resolved.
Show resolved Hide resolved
),

Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),
Expand Down
4 changes: 3 additions & 1 deletion node/modules/dtypes/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"

Expand Down Expand Up @@ -71,4 +72,5 @@ type SetExpectedSealDurationFunc func(time.Duration) error
// too determine how long sealing is expected to take
type GetExpectedSealDurationFunc func() (time.Duration, error)

type DealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error)
78 changes: 50 additions & 28 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,16 @@ func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider"))
}

func BasicDealFilter(user dtypes.DealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter {
spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {
return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter {
spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {

return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
b, err := onlineOk()
Expand Down Expand Up @@ -497,7 +497,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
df dtypes.DealFilter,
df dtypes.StorageDealFilter,
funds ProviderDealFunds,
) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h)
Expand All @@ -511,8 +511,52 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt)
}

func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter {
return func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter {
return func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := onlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting online retrieval deals", nil
}

b, err = offlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Info("offline retrieval has not been implemented yet")
}

if userFilter != nil {
return userFilter(ctx, state)
}

return true, "", nil
}
}
}

// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) {
func RetrievalProvider(h host.Host,
miner *storage.Miner,
sealer sectorstorage.SectorManager,
full lapi.FullNode,
ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer,
onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc,
userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)

maddr, err := minerAddrFromDS(ds)
Expand All @@ -521,29 +565,7 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
}

netwk := rmnet.NewFromLibp2pHost(h)

opt := retrievalimpl.DealDeciderOpt(func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := onlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting online retrieval deals", nil
}

b, err = offlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Info("offline retrieval has not been implemented yet")
}

return true, "", nil
})
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))

return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
}
Expand Down