-
Notifications
You must be signed in to change notification settings - Fork 724
/
control.go
128 lines (109 loc) · 3.96 KB
/
control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package connection
import (
"context"
"io"
"net"
"time"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/management"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
// RPCClientFunc derives a named tunnel rpc client that can then be used to register and unregister connections.
type RPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
type controlStream struct {
observer *Observer
connectedFuse ConnectedFuse
namedTunnelProperties *NamedTunnelProperties
connIndex uint8
edgeAddress net.IP
protocol Protocol
newRPCClientFunc RPCClientFunc
gracefulShutdownC <-chan struct{}
gracePeriod time.Duration
stoppedGracefully bool
}
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
type ControlStreamHandler interface {
// ServeControlStream handles the control plane of the transport in the current goroutine calling this
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error
// IsStopped tells whether the method above has finished
IsStopped() bool
}
type TunnelConfigJSONGetter interface {
GetConfigJSON() ([]byte, error)
}
// NewControlStream returns a new instance of ControlStreamHandler
func NewControlStream(
observer *Observer,
connectedFuse ConnectedFuse,
namedTunnelConfig *NamedTunnelProperties,
connIndex uint8,
edgeAddress net.IP,
newRPCClientFunc RPCClientFunc,
gracefulShutdownC <-chan struct{},
gracePeriod time.Duration,
protocol Protocol,
) ControlStreamHandler {
if newRPCClientFunc == nil {
newRPCClientFunc = newRegistrationRPCClient
}
return &controlStream{
observer: observer,
connectedFuse: connectedFuse,
namedTunnelProperties: namedTunnelConfig,
newRPCClientFunc: newRPCClientFunc,
connIndex: connIndex,
edgeAddress: edgeAddress,
gracefulShutdownC: gracefulShutdownC,
gracePeriod: gracePeriod,
protocol: protocol,
}
}
func (c *controlStream) ServeControlStream(
ctx context.Context,
rw io.ReadWriteCloser,
connOptions *tunnelpogs.ConnectionOptions,
tunnelConfigGetter TunnelConfigJSONGetter,
) error {
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
registrationDetails, err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.edgeAddress, c.observer)
if err != nil {
rpcClient.Close()
return err
}
c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol)
c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location)
c.connectedFuse.Connected()
// if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged {
if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil {
if err := rpcClient.SendLocalConfiguration(ctx, tunnelConfig, c.observer); err != nil {
c.observer.log.Err(err).Msg("unable to send local configuration")
}
} else {
c.observer.log.Err(err).Msg("failed to obtain current configuration")
}
}
c.waitForUnregister(ctx, rpcClient)
return nil
}
func (c *controlStream) waitForUnregister(ctx context.Context, rpcClient NamedTunnelRPCClient) {
// wait for connection termination or start of graceful shutdown
defer rpcClient.Close()
select {
case <-ctx.Done():
break
case <-c.gracefulShutdownC:
c.stoppedGracefully = true
}
c.observer.sendUnregisteringEvent(c.connIndex)
rpcClient.GracefulShutdown(ctx, c.gracePeriod)
c.observer.log.Info().
Int(management.EventTypeKey, int(management.Cloudflared)).
Uint8(LogFieldConnIndex, c.connIndex).
IPAddr(LogFieldIPAddress, c.edgeAddress).
Msg("Unregistered tunnel connection")
}
func (c *controlStream) IsStopped() bool {
return c.stoppedGracefully
}