Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: confirm block import notifier is closed properly #1736

Merged
merged 40 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0a59cb3
add TODOs to identify where block imported channel is handled
edwardmack Aug 17, 2021
a8cffd8
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Aug 18, 2021
30338f7
added comments for imported channels
edwardmack Aug 18, 2021
6ed9d63
create constructor for listeners
edwardmack Aug 19, 2021
0bc1f97
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Aug 23, 2021
2ae9572
added close channel to defer in listen
edwardmack Aug 23, 2021
0f95d68
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Aug 25, 2021
2b16fa6
move imported chan to block_notify
edwardmack Aug 26, 2021
7f10eec
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Aug 26, 2021
5f1c2a6
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Aug 30, 2021
2b512b2
remove comments, lint
edwardmack Aug 30, 2021
e3bd220
handle lint issues
edwardmack Aug 30, 2021
fd6d7cc
replace imported channel map with sync.Map
edwardmack Aug 30, 2021
125b68a
fix mocks in listeners test
edwardmack Aug 30, 2021
ee0fc7b
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Aug 31, 2021
6ca1a8b
fix mock functions for new imported notification channel
edwardmack Aug 31, 2021
d3eaa06
fix deep source issues
edwardmack Aug 31, 2021
41bb429
add debugging printf
edwardmack Sep 1, 2021
f0f76f1
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Sep 1, 2021
0164305
remove sync.Pool, and sync.Map
edwardmack Sep 2, 2021
b7f658f
handle channel closing
edwardmack Sep 3, 2021
74f094d
add sleep before close
edwardmack Sep 3, 2021
8a49ae7
remove channel close
edwardmack Sep 3, 2021
35180df
run go imported
edwardmack Sep 3, 2021
faf2664
defer importedLock unlock
edwardmack Sep 7, 2021
db397cd
wrap notifier channel in struct
edwardmack Sep 8, 2021
4b7d341
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Sep 9, 2021
d5d0d57
store channel by interface{} key
edwardmack Sep 9, 2021
f72e744
Merge branch 'ed/fix_send_on_closed_channel' of https://github.com/Ch…
edwardmack Sep 9, 2021
a478d5c
update storage key for imported block listeners
edwardmack Sep 9, 2021
d5754d9
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Sep 9, 2021
8b8066c
refacter GetImportedBlockNotifierChannel arugments
edwardmack Sep 10, 2021
1acfba7
GetImportedBlockNotifierChannel doesn't return error, fixed related test
edwardmack Sep 10, 2021
ce706cc
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Sep 20, 2021
515391d
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Sep 21, 2021
8ad895c
remove un-needed comments
edwardmack Sep 21, 2021
6b46fcd
Merge branch 'development' into ed/fix_send_on_closed_channel
edwardmack Sep 23, 2021
6bfb77d
remove close for FinalisedChannel listener
edwardmack Sep 23, 2021
48f6db0
added test for free imported channel
edwardmack Sep 23, 2021
8b30110
add mocks paths to .deepsource exclude_patterns
edwardmack Sep 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dot/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type resume struct {

// NewHandler returns a new Handler
func NewHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState) (*Handler, error) {
// TODO (ed) imported channel is unregistered and closed in handler.Stop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still a TODO?

//
imported := make(chan *types.Block, 16)
finalised := make(chan *types.FinalisationInfo, 16)
iid, err := blockState.RegisterImportedChannel(imported)
Expand Down
30 changes: 30 additions & 0 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,25 @@ type BlockListener struct {
cancelTimeout time.Duration
}

// NewBlockListener constructor for creating BlockListener
func NewBlockListener(conn *WSConn) *BlockListener {
bl := &BlockListener{
Channel: make(chan *types.Block, DEFAULT_BUFFER_SIZE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be renamed blockCh or something like that?

wsconn: conn,
cancel: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
done: make(chan struct{}, 1),
}
return bl
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.UnregisterImportedChannel(l.ChanID)
close(l.done)
close(l.Channel)
}()

for {
Expand Down Expand Up @@ -180,6 +193,7 @@ func (l *BlockFinalizedListener) Listen() {
defer func() {
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.chanID)
close(l.done)
close(l.channel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the @arijitAD comment on PR #1769 (https://github.com/ChainSafe/gossamer/pull/1769/files#r696558481), I think that would be nice if the Unregister... method closes the channel and remove it from the map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably shouldn't be closed here? it's a reader of the channel, not the writer, so should remove this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or this should be calling FreeImportedBlockNotifierChannel instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwardmack re-checking on this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noot yes, I removed close(l.channel) from here, I'm going to implement FreeFinalisedChannel in another PR, similar to what we did here with FreeImportedBlockNotifierChannel

}()

for {
Expand Down Expand Up @@ -228,6 +242,20 @@ type ExtrinsicSubmitListener struct {
cancelTimeout time.Duration
}

// NewExtrinsicSubmitListener constructor to build new ExtrinsicSubmitListener
func NewExtrinsicSubmitListener(conn *WSConn, extBytes []byte) *ExtrinsicSubmitListener {
esl := &ExtrinsicSubmitListener{
importedChan: make(chan *types.Block, DEFAULT_BUFFER_SIZE),
wsconn: conn,
extrinsic: types.Extrinsic(extBytes),
finalisedChan: make(chan *types.FinalisationInfo),
cancel: make(chan struct{}, 1),
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
}
return esl
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *ExtrinsicSubmitListener) Listen() {
// listen for imported blocks with extrinsic
Expand All @@ -236,6 +264,8 @@ func (l *ExtrinsicSubmitListener) Listen() {
l.wsconn.BlockAPI.UnregisterImportedChannel(l.importedChanID)
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalisedChanID)
close(l.done)
close(l.importedChan)
close(l.finalisedChan)
}()

for {
Expand Down
21 changes: 5 additions & 16 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,9 @@ func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (L
}

func (c *WSConn) initBlockListener(reqID float64, _ interface{}) (Listener, error) {
bl := &BlockListener{
Channel: make(chan *types.Block, DEFAULT_BUFFER_SIZE),
wsconn: c,
cancel: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
done: make(chan struct{}, 1),
}
// TODO (ed) here the imported Channel get unregistered and closed in http.Stop
// there doesn't seem to be an un-subscribe for this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does BlockListener.Channel get closed? i don't see it in the BlockListener.Stop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this code to a constructor for BlockListener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there is logic to close imported channel in dot/http.go line 194-5, which in inside HTTPServer.Stop. There is also within Listener.Stop a call to cancelWithTimeout that gets called when unsubscribe is called. Should we move the imported channel close into Listener.Stop, then call listener.Stop from within HTTPServer.Stop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we should localise closing channels (managing internal state) to receiver functions of the struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timwu20 the BlockListener.Channel channel is closed when l.wsconn.BlockAPI.UnregisterImportedChannel(l.ChanID) is called, we should not close the BlockListener.Channel directly because the BlockAPI is the owner of the channel. Every listener has a Stop method that calls the function cancelWithTimeout which will stop any running listener's goroutines, which in turn will call the defer function to unregister any active channel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, the implementation of Unregister... function is to just remove the channel from the map without closing it, and that is the problem. @edwardmack I think we could close the imported channel inside the defer function of each Listen method to keep things closer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added imported channel close to defer in Listen methods.

bl := NewBlockListener(c)

if c.BlockAPI == nil {
c.safeSendError(reqID, nil, "error BlockAPI not set")
Expand Down Expand Up @@ -285,16 +281,9 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (Listener
return nil, err
}

// TODO (ed) the importedChan get deleted by unregister imported channel in defer in Listen (listeners.go line 236)
// listen for built blocks
esl := &ExtrinsicSubmitListener{
importedChan: make(chan *types.Block, DEFAULT_BUFFER_SIZE),
wsconn: c,
extrinsic: types.Extrinsic(extBytes),
finalisedChan: make(chan *types.FinalisationInfo),
cancel: make(chan struct{}, 1),
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
}
esl := NewExtrinsicSubmitListener(c, extBytes)

if c.BlockAPI == nil {
return nil, fmt.Errorf("error BlockAPI not set")
Expand Down
1 change: 1 addition & 0 deletions lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (s *Service) initiate() error {

func (s *Service) waitForFirstBlock() error {
ch := make(chan *types.Block)
// TODO (ed) import channel is unregistered by defer, doesn't seem to be closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function has a problem. you can update the defer func to first close(ch) then call Unregister(), I think that should fix the issue at least partially, maybe there is a case where the GC collects ch and closes it before the Unregister is finished

id, err := s.blockState.RegisterImportedChannel(ch)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type tracker struct {

func newTracker(bs BlockState, out chan<- *networkVoteMessage) (*tracker, error) {
in := make(chan *types.Block, 16)
// TODO (ed) import channel is unregistered and closed in tracker.stop()
id, err := bs.RegisterImportedChannel(in)
if err != nil {
return nil, err
Expand Down