Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"context"
"errors"
"fmt"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -75,6 +76,20 @@ func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, si
return nil
}

func (m *manager) OnDuplicateTraversed(chid datatransfer.ChannelID, link ipld.Link, size uint64) error {
var handled bool
var err error
_ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error {
revalidator := processor.(datatransfer.Revalidator)
handled, err = revalidator.OnPullDuplicateTraversed(chid, size)
if handled {
return errors.New("stop processing")
}
return nil
})
return err
}

// OnDataQueued is called when the transport layer reports that it has queued
// up some data to be sent to the requester.
// It fires an event on the channel, updating the sum of queued data and calls
Expand All @@ -91,14 +106,23 @@ func (m *manager) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size
// If this block has already been queued on the channel, take no further
// action (this can happen when the data-transfer channel is restarted)
if !isNew {
return nil, nil
//fmt.Printf("\n recording false at size=%d, chid=%s", size, link)
//return nil, nil
} else {
//fmt.Printf("\n recorded OnDataQueued for size=%d, chid=%s", size, link)
}

// If this node initiated the data transfer, there's nothing more to do
if chid.Initiator == m.peerID {
return nil, nil
}

channel, err := m.channels.GetByID(context.TODO(), chid)
if err != nil {
panic(err)
}
fmt.Printf("\n\n --dt channel.sent=%d, channel.queued=%d, size=%d", channel.Sent(), channel.Queued(), size)

// Check each revalidator to see if they want to pause / resume, or send
// a message over the transport.
// For example if the data-sender is waiting for the receiver to pay for
Expand Down Expand Up @@ -254,6 +278,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, completeErr er
}
return m.channels.Complete(chid)
}
fmt.Println("\n will emit error on client")
return m.channels.Error(chid, err)
}

Expand Down Expand Up @@ -302,6 +327,7 @@ func (m *manager) receiveNewRequest(

func (m *manager) restartRequest(chid datatransfer.ChannelID,
incoming datatransfer.Request) (datatransfer.VoucherResult, error) {
fmt.Println("\n got restart req")

initiator := chid.Initiator
if m.peerID == initiator {
Expand All @@ -317,7 +343,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID,
return nil, err
}

voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
voucher, result, err := m.validateVoucher(true, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
if err != nil && err != datatransfer.ErrPause {
return result, xerrors.Errorf("failed to validate voucher: %w", err)
}
Expand Down Expand Up @@ -356,7 +382,7 @@ func (m *manager) acceptRequest(
return nil, err
}

voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
voucher, result, err := m.validateVoucher(false, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
if err != nil && err != datatransfer.ErrPause {
return result, err
}
Expand Down Expand Up @@ -405,7 +431,7 @@ func (m *manager) acceptRequest(
// * reading voucher fails
// * deserialization of selector fails
// * validation fails
func (m *manager) validateVoucher(sender peer.ID,
func (m *manager) validateVoucher(isRestart bool, sender peer.ID,
incoming datatransfer.Request,
isPull bool,
baseCid cid.Cid,
Expand All @@ -414,16 +440,16 @@ func (m *manager) validateVoucher(sender peer.ID,
if err != nil {
return nil, nil, err
}
var validatorFunc func(peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) (datatransfer.VoucherResult, error)
processor, _ := m.validatedTypes.Processor(vouch.Type())
validator := processor.(datatransfer.RequestValidator)

var result datatransfer.VoucherResult
if isPull {
validatorFunc = validator.ValidatePull
result, err = validator.ValidatePull(isRestart, sender, vouch, baseCid, stor)
} else {
validatorFunc = validator.ValidatePush
result, err = validator.ValidatePush(sender, vouch, baseCid, stor)
}

result, err := validatorFunc(sender, vouch, baseCid, stor)
return vouch, result, err
}

Expand Down
1 change: 1 addition & 0 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ func (m *manager) RegisterTransportConfigurer(voucherType datatransfer.Voucher,

// RestartDataTransferChannel restarts data transfer on the channel with the given channelId
func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
fmt.Println("\n calling restart CHANNEL")
log.Infof("restart channel %s", chid)

channel, err := m.channels.GetByID(ctx, chid)
Expand Down
2 changes: 1 addition & 1 deletion impl/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *manager) validateRestartVoucher(channel datatransfer.ChannelState, isPu
}

// revalidate the voucher by reconstructing the request that would have led to the creation of this channel
if _, _, err := m.validateVoucher(channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil {
if _, _, err := m.validateVoucher(false, channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type RequestValidator interface {
selector ipld.Node) (VoucherResult, error)
// ValidatePull validates a pull request received from the peer that will receive data
ValidatePull(
isRestart bool,
receiver peer.ID,
voucher Voucher,
baseCid cid.Cid,
Expand All @@ -39,6 +40,9 @@ type Revalidator interface {
// request revalidation or nil to continue uninterrupted,
// other errors will terminate the request.
OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (bool, VoucherResult, error)

OnPullDuplicateTraversed(chid ChannelID, additionalBytesSent uint64) (bool, error)

// OnPushDataReceived is called on the responder side when more bytes are received
// for a given push request. The first value indicates whether the request was
// recognized by this revalidator and should be considered 'handled'. If true,
Expand Down
2 changes: 2 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type EventsHandler interface {
// - err == ErrPause - pause this request
OnDataReceived(chid ChannelID, link ipld.Link, size uint64) error

OnDuplicateTraversed(chid ChannelID, link ipld.Link, size uint64) error

// OnDataQueued is called when data is queued for sending for the given channel ID
// return values are:
// message = data transfer message along with data
Expand Down
32 changes: 24 additions & 8 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package graphsync
import (
"context"
"errors"
"fmt"
"sync"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -104,11 +105,19 @@ func (t *Transport) OpenChannel(ctx context.Context,
stor ipld.Node,
doNotSendCids []cid.Cid,
msg datatransfer.Message) error {

fmt.Println("\n openChanel with doNotSendCids=")
for _, c := range doNotSendCids {
fmt.Println(c)
}

if t.events == nil {
fmt.Println("\n no handler set")
return datatransfer.ErrHandlerNotSet
}
exts, err := extension.ToExtensionData(msg, t.supportedExtensions)
if err != nil {
fmt.Println("\n extension error")
return err
}

Expand Down Expand Up @@ -136,6 +145,8 @@ func (t *Transport) OpenChannel(ctx context.Context,
Data: bz}
exts = append(exts, doNotSendExt)
}

fmt.Println("\n sent gs req")
responseChan, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...)

go t.executeGsRequest(internalCtx, channelID, responseChan, errChan)
Expand Down Expand Up @@ -443,19 +454,23 @@ func (t *Transport) gsBlockSentHook(p peer.ID, request graphsync.RequestData, bl
}

func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData, block graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
t.dataLock.RLock()
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
t.dataLock.RUnlock()
if !ok {
return
}

// When a data transfer is restarted, the requester sends a list of CIDs
// that it already has. Graphsync calls the outgoing block hook for all
// blocks even if they are in the list (meaning, they aren't actually going
// to be sent over the wire). So here we check if the block is actually
// going to be sent over the wire before firing the data queued event.
// to be sent over the wire).
if block.BlockSizeOnWire() == 0 {
return
}
// emit even for duplicate block here
if err := t.events.OnDuplicateTraversed(chid, block.Link(), block.BlockSize()); err != nil {
log.Errorf("OnDuplicateTraversed failed: %s", err)
}

t.dataLock.RLock()
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
t.dataLock.RUnlock()
if !ok {
return
}

Expand Down Expand Up @@ -576,6 +591,7 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req
t.completedResponseListener(chid)
}

fmt.Printf("\n will call OnChannelCompleted")
err := t.events.OnChannelCompleted(chid, completeErr)
if err != nil {
log.Error(err)
Expand Down