/
res-signal-peer.go
89 lines (76 loc) · 2.36 KB
/
res-signal-peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package signaling_rpc_client
import (
"context"
"github.com/aperturerobotics/bifrost/signaling"
"github.com/aperturerobotics/controllerbus/directive"
)
// signalPeerResolver resolves SignalPeer.
type signalPeerResolver struct {
c *Controller
dir signaling.SignalPeer
}
// resolveSignalPeer resolves the SignalPeer directive.
func (c *Controller) resolveSignalPeer(
ctx context.Context,
di directive.Instance,
dir signaling.SignalPeer,
) ([]directive.Resolver, error) {
// Check if the directive matches this client.
if signalingID := dir.SignalingID(); signalingID != "" {
if c.conf.GetSignalingId() != signalingID {
return nil, nil
}
}
if localPeerID := dir.SignalLocalPeerID(); localPeerID != "" {
// Check if we know the local peer id yet.
// We might not yet know if client is nil.
// Return a resolver anyway in this case.
if client := c.client.GetValue(); client != nil {
if client.peerID.String() != localPeerID.String() {
return nil, nil
}
}
}
// Return the resolver.
return directive.R(&signalPeerResolver{
c: c,
dir: dir,
}, nil)
}
// Resolve resolves the values, emitting them to the handler.
// The resolver may be canceled and restarted multiple times.
// Any fatal error resolving the value is returned.
// The resolver will not be retried after returning an error.
// Values will be maintained from the previous call.
func (r *signalPeerResolver) Resolve(ctx context.Context, handler directive.ResolverHandler) error {
// Wait for the client to be ready.
handler.ClearValues()
client, err := r.c.client.WaitValue(ctx, nil)
if err != nil {
return err
}
if localPeerID := r.dir.SignalLocalPeerID(); localPeerID != "" {
if client.peerID.String() != localPeerID.String() {
// Peer ID mismatch
return nil
}
}
// Add reference
remotePeerIDStr := r.dir.SignalRemotePeerID().String()
peerRef := client.AddPeerRef(remotePeerIDStr)
// Emit value
var val signaling.SignalPeerValue = NewSessionWithRef(peerRef)
vid, accepted := handler.AddValue(val)
if !accepted {
// We must already have a signaling channel via some other approach.
// Drop this one.
peerRef.Release()
return nil
}
// Release the ref when the value is removed or directive is released.
handler.AddValueRemovedCallback(vid, peerRef.Release)
// Done
return nil
}
// _ is a type assertion
var _ directive.Resolver = ((*signalPeerResolver)(nil))