Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Do Not Review #538

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.4.3
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210405073924-2ceebfb70d8e
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand All @@ -29,6 +29,7 @@ require (
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-unixfs v0.2.4
Expand All @@ -48,3 +49,6 @@ require (
)

replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi

replace github.com/filecoin-project/go-data-transfer => /Users/aarshshah/go/src/github.com/filecoin-project/go-data-transfer
replace github.com/ipfs/go-graphsync => /Users/aarshshah/go/src/github.com/ipfs/go-graphsync
22 changes: 15 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.4.3 h1:ECEw69NOfmEZ7XN1NSBvj3KTbbH2mIczQs+Z2w4bD7c=
github.com/filecoin-project/go-data-transfer v1.4.3/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210405073924-2ceebfb70d8e h1:Y80zgQ0IYTWHkMXXT50OFFCmKwlIACi9ftZ5ZhIFRA4=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210405073924-2ceebfb70d8e/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down Expand Up @@ -851,14 +851,15 @@ github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829 h1:wb7xrDzfkLgPHsSEBm+VSx6aDdi64VtV0xvP0E6j8bk=
github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829/go.mod h1:h/1PEBwj7Ym/8kOuMWvO2ujZ6Lt+TMbySEXNhjjR87I=
github.com/xlab/c-for-go v0.0.0-20201112171043-ea6dce5809cb h1:/7/dQyiKnxAOj9L69FhST7uMe17U015XPzX7cy+5ykM=
github.com/xlab/c-for-go v0.0.0-20201112171043-ea6dce5809cb/go.mod h1:pbNsDSxn1ICiNn9Ct4ZGNrwzfkkwYbx/lw8VuyutFIg=
github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245 h1:Sw125DKxZhPUI4JLlWugkzsrlB50jR9v2khiD9FxuSo=
github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245/go.mod h1:C+diUUz7pxhNY6KAoLgrTYARGWnt82zWTylZlxT92vk=
github.com/xorcare/golden v0.6.0 h1:E8emU8bhyMIEpYmgekkTUaw4vtcrRE+Wa0c5wYIcgXc=
github.com/xorcare/golden v0.6.0/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
Expand Down Expand Up @@ -908,6 +909,7 @@ golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -980,8 +982,9 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1062,8 +1065,9 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200711155855-7342f9734a7d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3 h1:r3P/5xOq/dK1991B65Oy6E1fRF/2d/fSYZJ/fXGVfJc=
golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696 h1:Bfazo+enXJET5SbHeh95NtxabJF6fJ9r/jpfRJgd3j4=
golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1160,8 +1164,12 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0 h1:wWpDlbK8ejRfSyi0frMyhilD3JBvtcx2AdGDnU+JtsE=
modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/golex v1.0.1 h1:EYKY1a3wStt0RzHaH8mdSRNg78Ub0OHxYfCRWw35YtM=
modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254=
modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk=
modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk=
modernc.org/mathutil v1.1.1 h1:FeylZSVX8S+58VsyJlkEj2bcpdytmp9MmDKZkKx8OIE=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
Expand Down
5 changes: 5 additions & 0 deletions retrievalmarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ const (

// ProviderEventClientCancelled happens when the provider gets a cancel message from the client's data transfer
ProviderEventClientCancelled

// ProviderEventMoveToOngoing moves the deal to ongoing state
ProviderEventMoveToOngoing

ProviderEventDataSentOnWire
)

// ProviderEvents is a human readable map of provider event name -> event description
Expand Down
1 change: 1 addition & 0 deletions retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ var ClientEvents = fsm.Events{
fsm.Event(rm.ClientEventBlocksReceived).
FromMany(rm.DealStatusOngoing,
rm.DealStatusFundsNeeded,
rm.DealStatusSendFunds,
rm.DealStatusFundsNeededLastPayment,
rm.DealStatusCheckComplete,
rm.DealStatusClientWaitingForLastBlocks).ToNoChange().
Expand Down
5 changes: 5 additions & 0 deletions retrievalmarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clientstates

import (
"context"
"fmt"

peer "github.com/libp2p/go-libp2p-core/peer"

Expand Down Expand Up @@ -103,6 +104,10 @@ func SendFunds(ctx fsm.Context, environment ClientDealEnvironment, deal rm.Clien
// check that paymentRequest <= (totalReceived - bytesPaidFor) * pricePerByte + (unsealPrice - unsealFundsPaid), or fail
retrievalPrice := big.Mul(abi.NewTokenAmount(int64(deal.TotalReceived-deal.BytesPaidFor)), deal.PricePerByte)
unsealPrice := big.Sub(deal.UnsealPrice, deal.UnsealFundsPaid)

fmt.Printf("\n deal.TotalReceived=%d, deal.BytesPaidFor=%d, unsealPrice=%d, paymentrequested=%d\n",
deal.TotalReceived, deal.BytesPaidFor, deal.UnsealPrice, deal.PaymentRequested)

if deal.PaymentRequested.GreaterThan(big.Add(retrievalPrice, unsealPrice)) {
return ctx.Trigger(rm.ClientEventBadPaymentRequested, "too much money requested for bytes sent")
}
Expand Down
2 changes: 1 addition & 1 deletion retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ const noEvent = rm.ClientEvent(math.MaxUint64)

func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelState) (rm.ClientEvent, []interface{}) {
switch event.Code {
case datatransfer.DataReceived:
case datatransfer.DataReceivedProgress:
return rm.ClientEventBlocksReceived, []interface{}{channelState.Received()}
case datatransfer.FinishTransfer:
return rm.ClientEventAllBlocksReceived, nil
Expand Down
15 changes: 15 additions & 0 deletions retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ func (pve *providerValidationEnvironment) BeginTracking(pds retrievalmarket.Prov
return pve.p.stateMachines.Send(pds.Identifier(), retrievalmarket.ProviderEventOpen)
}

func (pve *providerValidationEnvironment) MoveToOngoing(dealID retrievalmarket.ProviderDealIdentifier) error {
return pve.p.stateMachines.SendSync(context.Background(), dealID, retrievalmarket.ProviderEventMoveToOngoing)
}

func (pve *providerValidationEnvironment) UpdateSentBytes(dealID retrievalmarket.ProviderDealIdentifier, totalSent uint64) error {
return pve.p.stateMachines.SendSync(context.TODO(), dealID, retrievalmarket.ProviderEventPaymentRequested, totalSent)
}

// Returns the deal state/proposal if we already have it in the SM.
func (pve *providerValidationEnvironment) GetDealSync(dealID retrievalmarket.ProviderDealIdentifier) (retrievalmarket.ProviderDealState, error) {
var deal retrievalmarket.ProviderDealState
err := pve.p.stateMachines.GetSync(context.TODO(), dealID, &deal)
return deal, err
}

// NextStoreID allocates a store for this deal
func (pve *providerValidationEnvironment) NextStoreID() (multistore.StoreID, error) {
storeID := pve.p.multiStore.Next()
Expand Down
11 changes: 11 additions & 0 deletions retrievalmarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,19 @@ var ProviderEvents = fsm.Events{
fsm.Event(rm.ProviderEventBlocksCompleted).
FromMany(rm.DealStatusOngoing).To(rm.DealStatusBlocksComplete),

fsm.Event(rm.ProviderEventDataSentOnWire).
FromAny().ToJustRecord().
Action(func(deal *rm.ProviderDealState, totalSentOnWire uint64) error {
deal.TotalSentOnWire = totalSentOnWire
return nil
}),

// request payment
fsm.Event(rm.ProviderEventPaymentRequested).
FromMany(rm.DealStatusOngoing, rm.DealStatusUnsealed).To(rm.DealStatusFundsNeeded).
From(rm.DealStatusBlocksComplete).To(rm.DealStatusFundsNeededLastPayment).
From(rm.DealStatusNew).To(rm.DealStatusFundsNeededUnseal).
From(rm.DealStatusFundsNeeded).ToNoChange().
Action(func(deal *rm.ProviderDealState, totalSent uint64) error {
deal.TotalSent = totalSent
return nil
Expand Down Expand Up @@ -89,6 +97,9 @@ var ProviderEvents = fsm.Events{
return nil
}),

fsm.Event(rm.ProviderEventMoveToOngoing).
From(rm.DealStatusFundsNeeded).To(rm.DealStatusOngoing),

// completing
fsm.Event(rm.ProviderEventComplete).FromMany(rm.DealStatusBlocksComplete, rm.DealStatusFinalizing).To(rm.DealStatusCompleting),
fsm.Event(rm.ProviderEventCleanupComplete).From(rm.DealStatusCompleting).To(rm.DealStatusCompleted),
Expand Down
116 changes: 112 additions & 4 deletions retrievalmarket/impl/requestvalidation/requestvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"context"
"errors"
"fmt"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -40,16 +42,22 @@ type ValidationEnvironment interface {
BeginTracking(pds retrievalmarket.ProviderDealState) error
// NextStoreID allocates a store for this deal
NextStoreID() (multistore.StoreID, error)
// GetDealSync applies all pending events and returns the deal state if we are already tracking it.
GetDealSync(dealID retrievalmarket.ProviderDealIdentifier) (retrievalmarket.ProviderDealState, error)

UpdateSentBytes(dealID retrievalmarket.ProviderDealIdentifier, totalSent uint64) error
MoveToOngoing(dealID retrievalmarket.ProviderDealIdentifier) error
}

// ProviderRequestValidator validates incoming requests for the Retrieval Provider
type ProviderRequestValidator struct {
mu sync.Mutex
env ValidationEnvironment
}

// NewProviderRequestValidator returns a new instance of the ProviderRequestValidator
func NewProviderRequestValidator(env ValidationEnvironment) *ProviderRequestValidator {
return &ProviderRequestValidator{env}
return &ProviderRequestValidator{env: env}
}

// ValidatePush validates a push request received from the peer that will send data
Expand All @@ -58,7 +66,7 @@ func (rv *ProviderRequestValidator) ValidatePush(sender peer.ID, voucher datatra
}

// ValidatePull validates a pull request received from the peer that will receive data
func (rv *ProviderRequestValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (rv *ProviderRequestValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
proposal, ok := voucher.(*retrievalmarket.DealProposal)
var legacyProtocol bool
if !ok {
Expand All @@ -70,7 +78,15 @@ func (rv *ProviderRequestValidator) ValidatePull(receiver peer.ID, voucher datat
proposal = &newProposal
legacyProtocol = true
}
response, err := rv.validatePull(receiver, proposal, legacyProtocol, baseCid, selector)

var response *retrievalmarket.DealResponse
var err error
if isRestart {
response, err = rv.validatePullRestart(receiver, proposal, baseCid, selector)
} else {
response, err = rv.validatePull(receiver, proposal, legacyProtocol, baseCid, selector)
}

if response == nil {
return nil, err
}
Expand All @@ -86,8 +102,100 @@ func (rv *ProviderRequestValidator) ValidatePull(receiver peer.ID, voucher datat
return response, err
}

func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *retrievalmarket.DealProposal, legacyProtocol bool, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) {
func (rv *ProviderRequestValidator) validatePullRestart(receiver peer.ID, proposal *retrievalmarket.DealProposal, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) {
// TODO Striped Locking

rv.mu.Lock()
defer rv.mu.Unlock()

if proposal.PayloadCID != baseCid {
return nil, errors.New("incorrect CID for this proposal")
}

buf := new(bytes.Buffer)
err := dagcbor.Encoder(selector, buf)
if err != nil {
return nil, err
}
bytesCompare := allSelectorBytes
if proposal.SelectorSpecified() {
bytesCompare = proposal.Selector.Raw
}
if !bytes.Equal(buf.Bytes(), bytesCompare) {
return nil, errors.New("incorrect selector for this proposal")
}

dealId := retrievalmarket.ProviderDealIdentifier{
Receiver: receiver,
DealID: proposal.ID,
}

// ensure we already have this deal in the SM.
deal, err := rv.env.GetDealSync(dealId)
if err != nil {
return nil, err
}
fmt.Printf("\n got validate restart req, deal.TotalSent=%d, bytesOnWire=%d", deal.TotalSent, deal.TotalSentOnWire)

switch deal.Status {
case retrievalmarket.DealStatusOngoing:
fmt.Println("\n restarting in DealStatusOngoing\n")
// DealStatusOngoing means that no payment is pending and we shouldn't be pausing the responder here.
return nil, nil

case retrievalmarket.DealStatusFundsNeeded:
fmt.Println("\nrestarting in DealStatusFundsNeeded")
response := retrievalmarket.DealResponse{
ID: proposal.ID,
}
totalPaidFor := big.Div(big.Max(big.Sub(deal.FundsReceived, deal.UnsealPrice), big.Zero()), deal.PricePerByte).Uint64()

// reset the number of bytes sent on the wire
if deal.TotalSentOnWire != 0 {
//fmt.Printf("\n deal.TotalSent=%d, bytesOnWire=%d", deal.TotalSent, bytesSentOnWire)
if err := rv.env.UpdateSentBytes(dealId, deal.TotalSentOnWire); err != nil {
return nil, err
}
deal, err = rv.env.GetDealSync(dealId)
if err != nil {
return nil, err
}
}

if deal.TotalSent-totalPaidFor < deal.CurrentInterval {
// go back to ongoing state and resume transfer
//fmt.Printf("\n moving to ongoing, deal state is now %+v", deal)
if err := rv.env.MoveToOngoing(dealId); err != nil {
fmt.Println("\n failed to move to ongoing state")
return nil, err
}
return nil, nil
}

// ask for the right amount of money
response.Status = retrievalmarket.DealStatusFundsNeeded
response.PaymentOwed = big.Mul(abi.NewTokenAmount(int64(deal.TotalSent-totalPaidFor)), deal.PricePerByte)

fmt.Printf("\n in restarting from DealStatusFundsNeeded, totalPaidFor=%d, PaymentOwed=%d, cuurentInterval=%d, totalSent=%d,", totalPaidFor, response.PaymentOwed,
deal.CurrentInterval, deal.TotalSentOnWire)

return &response, datatransfer.ErrPause

case retrievalmarket.DealStatusFundsNeededLastPayment:
panic(errors.New("panic DealStatusFundsNeededLastPayment"))
fmt.Println("\n failing in DealStatusFundsNeededLastPayment\n")
// we are waiting to receive the last payment
return nil, errors.New("retreival restarts NOT supported for deals in state DealStatusFundsNeededLastPayment")

default:
panic(errors.New("panic default"))
fmt.Println("\n failing in arbitary state\n")
return nil, fmt.Errorf("retreival restarts NOT supported for deals in state %s",
retrievalmarket.DealStatuses[deal.Status])
}
}

func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *retrievalmarket.DealProposal, legacyProtocol bool, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) {
if proposal.PayloadCID != baseCid {
return nil, errors.New("incorrect CID for this proposal")
}
Expand Down
Loading