-
Notifications
You must be signed in to change notification settings - Fork 10
/
dir-build-channel-subscription.go
143 lines (123 loc) · 4.7 KB
/
dir-build-channel-subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package pubsub
import (
"context"
"errors"
"time"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/directive"
"github.com/libp2p/go-libp2p/core/crypto"
)
// BuildChannelSubscription is a directive to subscribe to a channel.
type BuildChannelSubscription interface {
// Directive indicates BuildChannelSubscription is a directive.
directive.Directive
// BuildChannelSubscriptionChannelID returns the channel ID constraint.
// Cannot be empty.
BuildChannelSubscriptionChannelID() string
// BuildChannelSubscriptionPrivKey returns the private key to use to subscribe.
// Cannot be empty.
BuildChannelSubscriptionPrivKey() crypto.PrivKey
}
// BuildChannelSubscriptionValue is the result type for BuildChannelSubscription.
// The value is removed and replaced when necessary.
type BuildChannelSubscriptionValue = Subscription
// buildChannelSubscription implements BuildChannelSubscription
type buildChannelSubscription struct {
channelID string
privKey crypto.PrivKey
}
// NewBuildChannelSubscription constructs a new BuildChannelSubscription directive.
func NewBuildChannelSubscription(channelID string, privKey crypto.PrivKey) BuildChannelSubscription {
return &buildChannelSubscription{channelID: channelID, privKey: privKey}
}
// ExBuildChannelSubscription executes the BuildChannelSubscription directive.
// Waits for the channel subscription to be built.
// If values are returned, returns vals, valsRef, nil
// Otherwise returns nil, nil, err
func ExBuildChannelSubscription(
ctx context.Context,
b bus.Bus,
returnIfIdle bool,
channelID string,
privKey crypto.PrivKey,
valDisposeCallback func(),
) (BuildChannelSubscriptionValue, directive.Instance, directive.Reference, error) {
return bus.ExecWaitValue[BuildChannelSubscriptionValue](
ctx,
b,
NewBuildChannelSubscription(channelID, privKey),
returnIfIdle,
valDisposeCallback,
nil,
)
}
// Validate validates the directive.
// This is a cursory validation to see if the values "look correct."
func (d *buildChannelSubscription) Validate() error {
if d.channelID == "" {
return errors.New("channel id cannot be empty")
}
if d.privKey == nil {
return errors.New("priv key cannot be empty")
}
return nil
}
// GetValueBuildChannelSubscriptionOptions returns options relating to value handling.
func (d *buildChannelSubscription) GetValueOptions() directive.ValueOptions {
return directive.ValueOptions{
// UnrefDisposeDur is the duration to wait to dispose a directive after all
// references have been released.
UnrefDisposeDur: time.Second * 3,
}
}
// BuildChannelSubscriptionChannelID returns the channel ID constraint.
func (d *buildChannelSubscription) BuildChannelSubscriptionChannelID() string {
return d.channelID
}
// BuildChannelSubscriptionPrivKey returns the private key.
func (d *buildChannelSubscription) BuildChannelSubscriptionPrivKey() crypto.PrivKey {
return d.privKey
}
// IsEquivalent checks if the other directive is equivalent. If two
// directives are equivalent, and the new directive does not superceed the
// old, then the new directive will be merged (de-duplicated) into the old.
func (d *buildChannelSubscription) IsEquivalent(other directive.Directive) bool {
// Note: for ChannelSubscription, we want a unique handle for each directive.
return false
/* If we wanted to de-duplicate (can't due to Release()):
od, ok := other.(BuildChannelSubscription)
if !ok {
return false
}
return d.BuildChannelSubscriptionChannelID() == od.BuildChannelSubscriptionChannelID() &&
od.BuildChannelSubscriptionPrivKey().GetPublic().Equals(d.BuildChannelSubscriptionPrivKey().GetPublic())
*/
}
// Superceeds checks if the directive overrides another.
// The other directive will be canceled if superceded.
func (d *buildChannelSubscription) Superceeds(other directive.Directive) bool {
return false
}
// GetName returns the directive's type name.
// This is not necessarily unique, and is primarily intended for display.
func (d *buildChannelSubscription) GetName() string {
return "BuildChannelSubscription"
}
// GetDebugString returns the directive arguments stringified.
// This should be something like param1="test", param2="test".
// This is not necessarily unique, and is primarily intended for display.
func (d *buildChannelSubscription) GetDebugVals() directive.DebugValues {
vals := directive.DebugValues{}
vals["channel-id"] = []string{d.BuildChannelSubscriptionChannelID()}
pkey := d.BuildChannelSubscriptionPrivKey()
if pkey != nil {
pid, _ := peer.IDFromPrivateKey(pkey)
if len(pid) != 0 {
vals["peer-id"] = []string{pid.Pretty()}
}
}
return vals
}
// _ is a type assertion
var _ BuildChannelSubscription = ((*buildChannelSubscription)(nil))