From 7014baa964ad5b7c92007a560a4ce945c6a52bd5 Mon Sep 17 00:00:00 2001 From: Christian Mouchet Date: Mon, 22 Apr 2024 18:36:59 +0200 Subject: [PATCH] changed stream closing sequence and added some debug output for registering --- protocols/executor.go | 2 -- server.go | 9 ++------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/protocols/executor.go b/protocols/executor.go index c55b64c..2c8e0a9 100644 --- a/protocols/executor.go +++ b/protocols/executor.go @@ -590,9 +590,7 @@ func (s *Executor) Unregister(peer sessions.NodeID) error { if !has { panic("unregistering an unregistered node") } - s.DisconnectedNode(peer) - delete(s.connectedNodes, peer) s.connectedNodesMu.Unlock() diff --git a/server.go b/server.go index 5efa84b..48c7222 100644 --- a/server.go +++ b/server.go @@ -146,7 +146,6 @@ func (hsv *HeliumServer) Run(ctx context.Context, app node.App, ip compute.Input func (hsv *HeliumServer) AppendEventToLog(event node.Event) { hsv.mu.Lock() hsv.events = append(hsv.events, event) - for nodeID, node := range hsv.nodes { if node.sendQueue != nil { select { @@ -240,16 +239,10 @@ func (hsv *HeliumServer) Register(_ *pb.Void, stream pb.Helium_RegisterServer) e // stream was terminated by the node or the server case <-cancelled: done = true - hsv.mu.Lock() - close(peer.sendQueue) - hsv.mu.Unlock() hsv.Logf("stream context done for %s, err = %s", nodeID, stream.Context().Err()) // the transport is closing case <-hsv.closing: - hsv.mu.Lock() - close(peer.sendQueue) - hsv.mu.Unlock() hsv.Logf("transport closing, closing queue for %s", nodeID) } } @@ -258,10 +251,12 @@ func (hsv *HeliumServer) Register(_ *pb.Void, stream pb.Helium_RegisterServer) e peer.sendQueue = nil hsv.mu.Unlock() + hsv.Logf("unregistering %s...", nodeID) err = hsv.helperNode.Unregister(nodeID) if err != nil { panic(err) } + hsv.Logf("unregistered %s", nodeID) return nil }