From 101d6b7fb0934312723c1aceb627fc3a657ff323 Mon Sep 17 00:00:00 2001 From: Christian Mouchet Date: Mon, 22 Apr 2024 17:54:25 +0200 Subject: [PATCH] fix blocking share rcv at server --- protocols/executor.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocols/executor.go b/protocols/executor.go index be7c387..c55b64c 100644 --- a/protocols/executor.go +++ b/protocols/executor.go @@ -211,13 +211,14 @@ func (s *Executor) Run(ctx context.Context, trans Transport) error { // TODO: ca s.runningProtoMu.RLock() proto, protoExists := s.runningProtos[incShare.ProtocolID] - s.runningProtoMu.RUnlock() if !protoExists { err := fmt.Errorf("invalide incoming share from sender %s: protocol %s is not running", incShare.From, incShare.ProtocolID) s.Logf("error recieving share: %s", err) + s.runningProtoMu.RUnlock() continue } proto.incoming <- incShare + s.runningProtoMu.RUnlock() case <-doneWithShareTransport: s.Logf("is done with share transport") return nil @@ -334,7 +335,7 @@ func (s *Executor) runAsAggregator(ctx context.Context, sess *sessions.Session, var disconnected chan sessions.NodeID s.runningProtoMu.Lock() s.connectedNodesMu.RLock() - incoming := make(chan Share) + incoming := make(chan Share, len(pd.Participants)) disconnected = make(chan sessions.NodeID, len(pd.Participants)) s.runningProtos[pid] = struct { pd Descriptor @@ -420,8 +421,8 @@ func (s *Executor) runAsAggregator(ctx context.Context, sess *sessions.Session, aggOut.Error = fmt.Errorf("node %s disconnected before providing its share", participantID) } } - cancelAgg() clearProtocol() + cancelAgg() if aggOut.Error != nil { s.upstream.Outgoing <- Event{EventType: Failed, Descriptor: pd}