Skip to content

Commit

Permalink
changed stream closing sequence and added some debug output for regis…
Browse files Browse the repository at this point in the history
…tering
  • Loading branch information
ChristianMct committed Apr 22, 2024
1 parent 101d6b7 commit 7014baa
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 9 deletions.
2 changes: 0 additions & 2 deletions protocols/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,7 @@ func (s *Executor) Unregister(peer sessions.NodeID) error {
if !has {
panic("unregistering an unregistered node")
}

s.DisconnectedNode(peer)

delete(s.connectedNodes, peer)
s.connectedNodesMu.Unlock()

Expand Down
9 changes: 2 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (hsv *HeliumServer) Run(ctx context.Context, app node.App, ip compute.Input
func (hsv *HeliumServer) AppendEventToLog(event node.Event) {
hsv.mu.Lock()
hsv.events = append(hsv.events, event)

for nodeID, node := range hsv.nodes {
if node.sendQueue != nil {
select {
Expand Down Expand Up @@ -240,16 +239,10 @@ func (hsv *HeliumServer) Register(_ *pb.Void, stream pb.Helium_RegisterServer) e
// stream was terminated by the node or the server
case <-cancelled:
done = true
hsv.mu.Lock()
close(peer.sendQueue)
hsv.mu.Unlock()
hsv.Logf("stream context done for %s, err = %s", nodeID, stream.Context().Err())

// the transport is closing
case <-hsv.closing:
hsv.mu.Lock()
close(peer.sendQueue)
hsv.mu.Unlock()
hsv.Logf("transport closing, closing queue for %s", nodeID)
}
}
Expand All @@ -258,10 +251,12 @@ func (hsv *HeliumServer) Register(_ *pb.Void, stream pb.Helium_RegisterServer) e
peer.sendQueue = nil
hsv.mu.Unlock()

hsv.Logf("unregistering %s...", nodeID)
err = hsv.helperNode.Unregister(nodeID)
if err != nil {
panic(err)
}
hsv.Logf("unregistered %s", nodeID)

return nil
}
Expand Down

0 comments on commit 7014baa

Please sign in to comment.