diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 9eb6d4de..cc833d18 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -166,25 +166,31 @@ func (t *Transport) OpenChannel(ctx context.Context, // Read from the graphsync response and error channels until they are closed, // and return the last error on the error channel -func (t *Transport) consumeResponses(responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) error { +func (t *Transport) consumeResponses(req *gsReq) error { var lastError error - for range responseChan { + for range req.responseChan { } - for err := range errChan { + log.Infof("channel %s: finished consuming graphsync response channel", req.channelID) + + for err := range req.errChan { lastError = err } + log.Infof("channel %s: finished consuming graphsync error channel", req.channelID) + return lastError } // Read from the graphsync response and error channels until they are closed // or there is an error, then call the channel completed callback func (t *Transport) executeGsRequest(req *gsReq) { + // Make sure to call the onComplete callback before returning defer func() { log.Infow("gs request complete for channel", "chid", req.channelID) req.onComplete() }() - lastError := t.consumeResponses(req.responseChan, req.errChan) + // Consume the response and error channels for the graphsync request + lastError := t.consumeResponses(req) // Request cancelled by client if _, ok := lastError.(graphsync.RequestClientCancelledErr); ok { @@ -198,19 +204,20 @@ func (t *Transport) executeGsRequest(req *gsReq) { // Request cancelled by responder if _, ok := lastError.(graphsync.RequestCancelledErr); ok { + log.Infof("channel %s: graphsync request cancelled by responder", req.channelID) // TODO Should we do anything for RequestCancelledErr ? return } if lastError != nil { - log.Warnf("graphsync error on channel %s: %s", req.channelID, lastError) + log.Warnf("channel %s: graphsync error: %s", req.channelID, lastError) } - log.Debugf("finished executing graphsync request for channel %s", req.channelID) + log.Debugf("channel %s: finished executing graphsync request", req.channelID) var completeErr error if lastError != nil { - completeErr = xerrors.Errorf("graphsync request failed to complete: %w", lastError) + completeErr = xerrors.Errorf("channel %s: graphsync request failed to complete: %w", req.channelID, lastError) } // Used by the tests to listen for when a request completes @@ -220,7 +227,7 @@ func (t *Transport) executeGsRequest(req *gsReq) { err := t.events.OnChannelCompleted(req.channelID, completeErr) if err != nil { - log.Errorf("processing OnChannelCompleted %s: %s", req.channelID, err) + log.Errorf("channel %s: processing OnChannelCompleted: %s", req.channelID, err) } }