Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,31 @@ func (t *Transport) OpenChannel(ctx context.Context,

// Read from the graphsync response and error channels until they are closed,
// and return the last error on the error channel
func (t *Transport) consumeResponses(responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) error {
func (t *Transport) consumeResponses(req *gsReq) error {
var lastError error
for range responseChan {
for range req.responseChan {
}
for err := range errChan {
log.Infof("channel %s: finished consuming graphsync response channel", req.channelID)

for err := range req.errChan {
lastError = err
}
log.Infof("channel %s: finished consuming graphsync error channel", req.channelID)

return lastError
}

// Read from the graphsync response and error channels until they are closed
// or there is an error, then call the channel completed callback
func (t *Transport) executeGsRequest(req *gsReq) {
// Make sure to call the onComplete callback before returning
defer func() {
log.Infow("gs request complete for channel", "chid", req.channelID)
req.onComplete()
}()

lastError := t.consumeResponses(req.responseChan, req.errChan)
// Consume the response and error channels for the graphsync request
lastError := t.consumeResponses(req)

// Request cancelled by client
if _, ok := lastError.(graphsync.RequestClientCancelledErr); ok {
Expand All @@ -198,19 +204,20 @@ func (t *Transport) executeGsRequest(req *gsReq) {

// Request cancelled by responder
if _, ok := lastError.(graphsync.RequestCancelledErr); ok {
log.Infof("channel %s: graphsync request cancelled by responder", req.channelID)
// TODO Should we do anything for RequestCancelledErr ?
return
}

if lastError != nil {
log.Warnf("graphsync error on channel %s: %s", req.channelID, lastError)
log.Warnf("channel %s: graphsync error: %s", req.channelID, lastError)
}

log.Debugf("finished executing graphsync request for channel %s", req.channelID)
log.Debugf("channel %s: finished executing graphsync request", req.channelID)

var completeErr error
if lastError != nil {
completeErr = xerrors.Errorf("graphsync request failed to complete: %w", lastError)
completeErr = xerrors.Errorf("channel %s: graphsync request failed to complete: %w", req.channelID, lastError)
}

// Used by the tests to listen for when a request completes
Expand All @@ -220,7 +227,7 @@ func (t *Transport) executeGsRequest(req *gsReq) {

err := t.events.OnChannelCompleted(req.channelID, completeErr)
if err != nil {
log.Errorf("processing OnChannelCompleted %s: %s", req.channelID, err)
log.Errorf("channel %s: processing OnChannelCompleted: %s", req.channelID, err)
}
}

Expand Down