-
Notifications
You must be signed in to change notification settings - Fork 10
/
forwarding.go
120 lines (107 loc) · 3.18 KB
/
forwarding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package stream_forwarding
import (
"context"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/controller"
"github.com/aperturerobotics/controllerbus/directive"
ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
)
// Controller implements the forwarding controller. The controller handles
// HandleMountedStream directives by dialing a target multiaddress.
type Controller struct {
// le is the logger
le *logrus.Entry
// bus is the controller bus
bus bus.Bus
// conf is the config
conf *Config
// dialMa is the dial multiaddr
dialMa ma.Multiaddr
// localPeerID is the peer ID to dial for
localPeerID peer.ID
}
// NewController constructs a new forwarding controller.
func NewController(
le *logrus.Entry,
bus bus.Bus,
conf *Config,
) (*Controller, error) {
dialMa, err := conf.ParseTargetMultiaddr()
if err != nil {
return nil, err
}
pid, err := conf.ParsePeerID()
if err != nil {
return nil, err
}
return &Controller{
le: le,
conf: conf,
bus: bus,
dialMa: dialMa,
localPeerID: pid,
}, nil
}
// GetControllerInfo returns information about the controller.
func (c *Controller) GetControllerInfo() *controller.Info {
return controller.NewInfo(
ControllerID,
Version,
"forwarding controller",
)
}
// Execute executes the forwarding controller.
// Returning nil ends execution.
// Returning an error triggers a retry with backoff.
func (c *Controller) Execute(ctx context.Context) error {
// For forwarding, we just handle directives directly.
return nil
}
// HandleDirective asks if the handler can resolve the directive.
// If it can, it returns a resolver. If not, returns nil.
// Any unexpected errors are returned for logging.
// It is safe to add a reference to the directive during this call.
func (c *Controller) HandleDirective(ctx context.Context, di directive.Instance) ([]directive.Resolver, error) {
dir := di.GetDirective()
// HandleMountedStream handler.
if d, ok := dir.(link.HandleMountedStream); ok {
return c.resolveHandleMountedStream(ctx, di, d)
}
return nil, nil
}
// resolveHandleMountedStream resolves a HandleMountedStream directive by dialing a target.
func (c *Controller) resolveHandleMountedStream(
ctx context.Context,
di directive.Instance,
dir link.HandleMountedStream,
) ([]directive.Resolver, error) {
if c.conf.GetProtocolId() != "" &&
c.conf.GetProtocolId() != string(dir.HandleMountedStreamProtocolID()) {
return nil, nil
}
if localPeerID := c.localPeerID; localPeerID != peer.ID("") {
if lid := dir.HandleMountedStreamLocalPeerID(); lid != localPeerID {
c.le.Debugf(
"incoming stream %s != filtered %s",
lid.String(),
localPeerID.String(),
)
return nil, nil
}
}
dialRes, err := NewDialResolver(c.le, c.bus, c.dialMa)
if err != nil {
return nil, err
}
return directive.Resolvers(dialRes), nil
}
// Close releases any resources used by the controller.
// Error indicates any issue encountered releasing.
func (c *Controller) Close() error {
return nil
}
// _ is a type assertion
var _ controller.Controller = ((*Controller)(nil))