-
Notifications
You must be signed in to change notification settings - Fork 29
/
connection.go
81 lines (62 loc) · 2.07 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package server
import (
"context"
"fmt"
"strings"
"github.com/choria-io/go-choria/srvcache"
)
func (srv *Instance) initialConnect(ctx context.Context) (err error) {
if ctx.Err() != nil {
return fmt.Errorf("exiting on shut down")
}
brokers := func() (srvcache.Servers, error) {
tempsrv, err := srv.brokerUrls(ctx)
if err != nil {
return nil, fmt.Errorf("could not find Choria Network Brokers: %s", err)
}
list := tempsrv.Strings()
srv.log.Infof("Choria Network Brokers: %#v", strings.Join(list, ", "))
return tempsrv, nil
}
srv.connector, err = srv.fw.NewConnector(ctx, brokers, srv.fw.Certname(), srv.log)
if err != nil {
return fmt.Errorf("could not create connector: %s", err)
}
return nil
}
func (srv *Instance) brokerUrls(ctx context.Context) (servers srvcache.Servers, err error) {
if srv.fw.ProvisionMode() {
servers, err = srv.fw.ProvisioningServers(ctx)
if err != nil {
srv.log.Errorf("Could not determine provisioning broker urls while provisioning is configured: %s", err)
}
// provisioning is like a flat network no federation
// so this will disable federation when provisioning
// and after provisioning the reload will restore
// the configured federation setup and all will
// continue as normal with federation and all
if servers.Count() > 0 {
srv.mu.Lock()
if !srv.provisioning {
srv.log.Infof("Entering provision mode with servers %v", strings.Join(servers.Strings(), ", "))
srv.provisioning = true
}
srv.mu.Unlock()
return servers, nil
}
}
servers, err = srv.fw.MiddlewareServers()
return servers, err
}
func (srv *Instance) subscribeNode(ctx context.Context) error {
var err error
for _, collective := range srv.cfg.Collectives {
target := srv.connector.NodeDirectedTarget(collective, srv.cfg.Identity)
srv.log.Infof("Subscribing node %s to %s", srv.cfg.Identity, target)
err = srv.connector.QueueSubscribe(ctx, fmt.Sprintf("node.%s", collective), target, "", srv.requests)
if err != nil {
return fmt.Errorf("could not subscribe to node directed targets: %s", err)
}
}
return nil
}