diff --git a/impl/events.go b/impl/events.go index ae2b6cd2..26b039e6 100644 --- a/impl/events.go +++ b/impl/events.go @@ -3,6 +3,7 @@ package impl import ( "context" "errors" + "fmt" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -75,6 +76,20 @@ func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, si return nil } +func (m *manager) OnDuplicateTraversed(chid datatransfer.ChannelID, link ipld.Link, size uint64) error { + var handled bool + var err error + _ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error { + revalidator := processor.(datatransfer.Revalidator) + handled, err = revalidator.OnPullDuplicateTraversed(chid, size) + if handled { + return errors.New("stop processing") + } + return nil + }) + return err +} + // OnDataQueued is called when the transport layer reports that it has queued // up some data to be sent to the requester. // It fires an event on the channel, updating the sum of queued data and calls @@ -91,7 +106,10 @@ func (m *manager) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size // If this block has already been queued on the channel, take no further // action (this can happen when the data-transfer channel is restarted) if !isNew { - return nil, nil + //fmt.Printf("\n recording false at size=%d, chid=%s", size, link) + //return nil, nil + } else { + //fmt.Printf("\n recorded OnDataQueued for size=%d, chid=%s", size, link) } // If this node initiated the data transfer, there's nothing more to do @@ -99,6 +117,12 @@ func (m *manager) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size return nil, nil } + channel, err := m.channels.GetByID(context.TODO(), chid) + if err != nil { + panic(err) + } + fmt.Printf("\n\n --dt channel.sent=%d, channel.queued=%d, size=%d", channel.Sent(), channel.Queued(), size) + // Check each revalidator to see if they want to pause / resume, or send // a message over the transport. // For example if the data-sender is waiting for the receiver to pay for @@ -254,6 +278,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, completeErr er } return m.channels.Complete(chid) } + fmt.Println("\n will emit error on client") return m.channels.Error(chid, err) } @@ -302,6 +327,7 @@ func (m *manager) receiveNewRequest( func (m *manager) restartRequest(chid datatransfer.ChannelID, incoming datatransfer.Request) (datatransfer.VoucherResult, error) { + fmt.Println("\n got restart req") initiator := chid.Initiator if m.peerID == initiator { @@ -317,7 +343,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID, return nil, err } - voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) + voucher, result, err := m.validateVoucher(true, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) if err != nil && err != datatransfer.ErrPause { return result, xerrors.Errorf("failed to validate voucher: %w", err) } @@ -356,7 +382,7 @@ func (m *manager) acceptRequest( return nil, err } - voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) + voucher, result, err := m.validateVoucher(false, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) if err != nil && err != datatransfer.ErrPause { return result, err } @@ -405,7 +431,7 @@ func (m *manager) acceptRequest( // * reading voucher fails // * deserialization of selector fails // * validation fails -func (m *manager) validateVoucher(sender peer.ID, +func (m *manager) validateVoucher(isRestart bool, sender peer.ID, incoming datatransfer.Request, isPull bool, baseCid cid.Cid, @@ -414,16 +440,16 @@ func (m *manager) validateVoucher(sender peer.ID, if err != nil { return nil, nil, err } - var validatorFunc func(peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) (datatransfer.VoucherResult, error) processor, _ := m.validatedTypes.Processor(vouch.Type()) validator := processor.(datatransfer.RequestValidator) + + var result datatransfer.VoucherResult if isPull { - validatorFunc = validator.ValidatePull + result, err = validator.ValidatePull(isRestart, sender, vouch, baseCid, stor) } else { - validatorFunc = validator.ValidatePush + result, err = validator.ValidatePush(sender, vouch, baseCid, stor) } - result, err := validatorFunc(sender, vouch, baseCid, stor) return vouch, result, err } diff --git a/impl/impl.go b/impl/impl.go index 8490f11e..1631160f 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -457,6 +457,7 @@ func (m *manager) RegisterTransportConfigurer(voucherType datatransfer.Voucher, // RestartDataTransferChannel restarts data transfer on the channel with the given channelId func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { + fmt.Println("\n calling restart CHANNEL") log.Infof("restart channel %s", chid) channel, err := m.channels.GetByID(ctx, chid) diff --git a/impl/restart.go b/impl/restart.go index ac0bd12c..5d8ec5c8 100644 --- a/impl/restart.go +++ b/impl/restart.go @@ -73,7 +73,7 @@ func (m *manager) validateRestartVoucher(channel datatransfer.ChannelState, isPu } // revalidate the voucher by reconstructing the request that would have led to the creation of this channel - if _, _, err := m.validateVoucher(channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil { + if _, _, err := m.validateVoucher(false, channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil { return err } diff --git a/manager.go b/manager.go index 7b9c3402..9bf6f47b 100644 --- a/manager.go +++ b/manager.go @@ -19,6 +19,7 @@ type RequestValidator interface { selector ipld.Node) (VoucherResult, error) // ValidatePull validates a pull request received from the peer that will receive data ValidatePull( + isRestart bool, receiver peer.ID, voucher Voucher, baseCid cid.Cid, @@ -39,6 +40,9 @@ type Revalidator interface { // request revalidation or nil to continue uninterrupted, // other errors will terminate the request. OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (bool, VoucherResult, error) + + OnPullDuplicateTraversed(chid ChannelID, additionalBytesSent uint64) (bool, error) + // OnPushDataReceived is called on the responder side when more bytes are received // for a given push request. The first value indicates whether the request was // recognized by this revalidator and should be considered 'handled'. If true, diff --git a/transport.go b/transport.go index e271e7be..ad800dd2 100644 --- a/transport.go +++ b/transport.go @@ -27,6 +27,8 @@ type EventsHandler interface { // - err == ErrPause - pause this request OnDataReceived(chid ChannelID, link ipld.Link, size uint64) error + OnDuplicateTraversed(chid ChannelID, link ipld.Link, size uint64) error + // OnDataQueued is called when data is queued for sending for the given channel ID // return values are: // message = data transfer message along with data diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 194047c5..040fdf16 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -3,6 +3,7 @@ package graphsync import ( "context" "errors" + "fmt" "sync" "github.com/ipfs/go-cid" @@ -104,11 +105,19 @@ func (t *Transport) OpenChannel(ctx context.Context, stor ipld.Node, doNotSendCids []cid.Cid, msg datatransfer.Message) error { + + fmt.Println("\n openChanel with doNotSendCids=") + for _, c := range doNotSendCids { + fmt.Println(c) + } + if t.events == nil { + fmt.Println("\n no handler set") return datatransfer.ErrHandlerNotSet } exts, err := extension.ToExtensionData(msg, t.supportedExtensions) if err != nil { + fmt.Println("\n extension error") return err } @@ -136,6 +145,8 @@ func (t *Transport) OpenChannel(ctx context.Context, Data: bz} exts = append(exts, doNotSendExt) } + + fmt.Println("\n sent gs req") responseChan, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...) go t.executeGsRequest(internalCtx, channelID, responseChan, errChan) @@ -443,19 +454,23 @@ func (t *Transport) gsBlockSentHook(p peer.ID, request graphsync.RequestData, bl } func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData, block graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) { + t.dataLock.RLock() + chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}] + t.dataLock.RUnlock() + if !ok { + return + } + // When a data transfer is restarted, the requester sends a list of CIDs // that it already has. Graphsync calls the outgoing block hook for all // blocks even if they are in the list (meaning, they aren't actually going - // to be sent over the wire). So here we check if the block is actually - // going to be sent over the wire before firing the data queued event. + // to be sent over the wire). if block.BlockSizeOnWire() == 0 { - return - } + // emit even for duplicate block here + if err := t.events.OnDuplicateTraversed(chid, block.Link(), block.BlockSize()); err != nil { + log.Errorf("OnDuplicateTraversed failed: %s", err) + } - t.dataLock.RLock() - chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}] - t.dataLock.RUnlock() - if !ok { return } @@ -576,6 +591,7 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req t.completedResponseListener(chid) } + fmt.Printf("\n will call OnChannelCompleted") err := t.events.OnChannelCompleted(chid, completeErr) if err != nil { log.Error(err)