Skip to content

Commit

Permalink
fix blocking share rcv at server
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristianMct committed Apr 22, 2024
1 parent 2a90505 commit 101d6b7
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions protocols/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,14 @@ func (s *Executor) Run(ctx context.Context, trans Transport) error { // TODO: ca

s.runningProtoMu.RLock()
proto, protoExists := s.runningProtos[incShare.ProtocolID]
s.runningProtoMu.RUnlock()
if !protoExists {
err := fmt.Errorf("invalide incoming share from sender %s: protocol %s is not running", incShare.From, incShare.ProtocolID)
s.Logf("error recieving share: %s", err)
s.runningProtoMu.RUnlock()
continue
}
proto.incoming <- incShare
s.runningProtoMu.RUnlock()
case <-doneWithShareTransport:
s.Logf("is done with share transport")
return nil
Expand Down Expand Up @@ -334,7 +335,7 @@ func (s *Executor) runAsAggregator(ctx context.Context, sess *sessions.Session,
var disconnected chan sessions.NodeID
s.runningProtoMu.Lock()
s.connectedNodesMu.RLock()
incoming := make(chan Share)
incoming := make(chan Share, len(pd.Participants))
disconnected = make(chan sessions.NodeID, len(pd.Participants))
s.runningProtos[pid] = struct {
pd Descriptor
Expand Down Expand Up @@ -420,8 +421,8 @@ func (s *Executor) runAsAggregator(ctx context.Context, sess *sessions.Session,
aggOut.Error = fmt.Errorf("node %s disconnected before providing its share", participantID)
}
}
cancelAgg()
clearProtocol()
cancelAgg()

if aggOut.Error != nil {
s.upstream.Outgoing <- Event{EventType: Failed, Descriptor: pd}
Expand Down

0 comments on commit 101d6b7

Please sign in to comment.