Skip to content

Commit

Permalink
not sending pd to completed circuits
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristianMct committed Apr 22, 2024
1 parent 22182db commit cc9c8b1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
10 changes: 10 additions & 0 deletions circuits/circuits.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,13 @@ func (t EventType) String() string {
func (u Event) String() string {
return fmt.Sprintf("%s: %s", u.EventType, u.Descriptor)
}

// IDFromProtocolDescriptor returns the circuit ID from a protocol descriptor. // TODO cleaner way than op label ?
func IDFromProtocolDescriptor(pd protocols.Descriptor) sessions.CircuitID {
opls, has := pd.Signature.Args["op"]
if !has {
panic("no op argument in circuit protocol event")
}
opl := OperandLabel(opls)
return opl.CircuitID()
}
27 changes: 18 additions & 9 deletions services/compute/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ChristianMct/helium/protocols"
"github.com/ChristianMct/helium/services"
"github.com/ChristianMct/helium/sessions"
"github.com/ChristianMct/helium/utils"
"github.com/tuneinsight/lattigo/v5/core/rlwe"
"github.com/tuneinsight/lattigo/v5/schemes/bgv"
"github.com/tuneinsight/lattigo/v5/schemes/ckks"
Expand Down Expand Up @@ -299,7 +300,9 @@ func (s *Service) init(ctx context.Context, upstreamInc <-chan Event, present in

// stacks the completed circuit in a queue for processing by Run
s.completedCircuits = make(chan circuits.Descriptor, len(complCd))
completed := utils.NewEmptySet[sessions.CircuitID]()
for _, ccd := range complCd {
completed.Add(ccd.CircuitID)
s.completedCircuits <- ccd
}

Expand All @@ -311,13 +314,18 @@ func (s *Service) init(ctx context.Context, upstreamInc <-chan Event, present in
s.queuedCircuits <- rcd
}

// sends the completed
// sends the completed pd to the running circuits
for _, cpd := range complPd {

if !cpd.Signature.Type.IsCompute() {
continue
}

cid := circuits.IDFromProtocolDescriptor(cpd)
if completed.Contains(cid) {
continue // TODO: this would not work for an offline reciever reconnecting: it needs the pd for finalizing dec.
}

if err := s.sendCompletedPdToCircuit(cpd); err != nil {
return err
}
Expand Down Expand Up @@ -519,20 +527,21 @@ func (s *Service) Run(ctx context.Context, ip InputProvider, or OutputReceiver,
return nil
}

func (s *Service) sendCompletedPdToCircuit(pd protocols.Descriptor) error {
opls, has := pd.Signature.Args["op"]
if !has {
panic("no op argument in circuit protocol event")
}
type CircuitNotRunningError struct { // TODO: use more generally
CircuitID sessions.CircuitID
}

opl := circuits.OperandLabel(opls)
cid := opl.CircuitID()
func (e CircuitNotRunningError) Error() string {
return fmt.Sprintf("circuit %s is not running", e.CircuitID)
}

func (s *Service) sendCompletedPdToCircuit(pd protocols.Descriptor) error {
cid := circuits.IDFromProtocolDescriptor(pd)
s.runningCircuitsMu.RLock()
c, has := s.runningCircuits[cid]
s.runningCircuitsMu.RUnlock()
if !has {
panic(fmt.Errorf("circuit is not running: %s", cid))
return &CircuitNotRunningError{CircuitID: cid}
}

return c.CompletedProtocol(pd)
Expand Down

0 comments on commit cc9c8b1

Please sign in to comment.