-
Notifications
You must be signed in to change notification settings - Fork 14
Fix pause on restart #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix pause on restart #199
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,7 @@ type Transport struct { | |
| contextCancelMap map[datatransfer.ChannelID]cancelRequest | ||
| pending map[datatransfer.ChannelID]chan struct{} | ||
| requestorCancelledMap map[datatransfer.ChannelID]struct{} | ||
| channelXferStarted map[datatransfer.ChannelID]bool | ||
| pendingExtensions map[datatransfer.ChannelID][]graphsync.ExtensionData | ||
| stores map[datatransfer.ChannelID]struct{} | ||
| supportedExtensions []graphsync.ExtensionName | ||
|
|
@@ -98,6 +99,7 @@ func NewTransport(peerID peer.ID, gs graphsync.GraphExchange, options ...Option) | |
| pendingExtensions: make(map[datatransfer.ChannelID][]graphsync.ExtensionData), | ||
| channelIDMap: make(map[datatransfer.ChannelID]graphsyncKey), | ||
| pending: make(map[datatransfer.ChannelID]chan struct{}), | ||
| channelXferStarted: make(map[datatransfer.ChannelID]bool), | ||
| stores: make(map[datatransfer.ChannelID]struct{}), | ||
| supportedExtensions: defaultSupportedExtensions, | ||
| } | ||
|
|
@@ -149,15 +151,22 @@ func (t *Transport) OpenChannel(ctx context.Context, | |
| // Relock now that request has been cancelled | ||
| t.dataLock.Lock() | ||
| } | ||
| // Set up the request listeners | ||
|
|
||
| // Keep track of "pending" channels. | ||
| // The channel is in the "pending" state when we've made a call to | ||
| // Graphsync to open a request, but Graphsync hasn't yet called the | ||
| // outgoing request hook. | ||
| t.pending[channelID] = make(chan struct{}) | ||
|
|
||
| // Create a cancellable context for the channel so that the graphsync | ||
| // request can be cancelled | ||
| internalCtx, internalCancel := context.WithCancel(ctx) | ||
| cancelRQ := cancelRequest{ | ||
| cancel: internalCancel, | ||
| completed: make(chan struct{}), | ||
| } | ||
| t.contextCancelMap[channelID] = cancelRQ | ||
|
|
||
| t.dataLock.Unlock() | ||
|
|
||
| // If this is a restart request, the client can send a list of CIDs of | ||
|
|
@@ -348,10 +357,10 @@ func (t *Transport) ResumeChannel(ctx context.Context, | |
| defer t.dataLock.Unlock() | ||
|
|
||
| if _, ok := t.requestorCancelledMap[chid]; ok { | ||
|
|
||
| t.pendingExtensions[chid] = append(t.pendingExtensions[chid], extensions...) | ||
| return nil | ||
| } | ||
| t.channelXferStarted[chid] = true | ||
| return t.gs.UnpauseResponse(gsKey.p, gsKey.requestID, extensions...) | ||
| } | ||
|
|
||
|
|
@@ -375,10 +384,11 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI | |
| return nil | ||
| } | ||
| t.dataLock.Lock() | ||
| if _, ok := t.requestorCancelledMap[chid]; ok { | ||
| _, ok := t.requestorCancelledMap[chid] | ||
| t.dataLock.Unlock() | ||
| if ok { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woah. Great catch man ! This could have been causing hairy deadlock issues ...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think it actually has caused some deadlock issues, I suspect once we release we'll see a bunch of weird problems go away. |
||
| return nil | ||
| } | ||
| t.dataLock.Unlock() | ||
| return t.gs.CancelResponse(gsKey.p, gsKey.requestID) | ||
| } | ||
|
|
||
|
|
@@ -606,11 +616,26 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook | |
| return | ||
| } | ||
|
|
||
| // Check if the callback indicated that the channel should be paused | ||
| // immediately | ||
| paused := false | ||
| if err == datatransfer.ErrPause { | ||
| paused = true | ||
| hookActions.PauseResponse() | ||
| } | ||
|
|
||
| t.dataLock.Lock() | ||
|
|
||
| // If this is a restart request, and the data transfer still hasn't got | ||
| // out of the paused state (eg because we're still unsealing), start this | ||
| // graphsync response in the paused state. | ||
| hasXferStarted, isRestart := t.channelXferStarted[chid] | ||
| if isRestart && !hasXferStarted && !paused { | ||
| paused = true | ||
| hookActions.PauseResponse() | ||
| } | ||
| t.channelXferStarted[chid] = !paused | ||
|
|
||
| gsKey := graphsyncKey{request.ID(), p} | ||
| if _, ok := t.requestorCancelledMap[chid]; ok { | ||
| delete(t.requestorCancelledMap, chid) | ||
|
|
@@ -626,7 +651,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook | |
| if ok { | ||
| hookActions.UsePersistenceOption("data-transfer-" + chid.String()) | ||
| } | ||
|
|
||
| t.dataLock.Unlock() | ||
|
|
||
| hookActions.ValidateRequest() | ||
| } | ||
|
|
||
|
|
@@ -695,6 +722,7 @@ func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncK | |
| delete(t.graphsyncRequestMap, gsKey) | ||
| delete(t.pendingExtensions, chid) | ||
| delete(t.requestorCancelledMap, chid) | ||
| delete(t.channelXferStarted, chid) | ||
| _, ok := t.stores[chid] | ||
| if ok { | ||
| opt := "data-transfer-" + chid.String() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this getting configured/injected anywhere. What's the purpose of this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just used by the tests