-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiscovery.go
117 lines (108 loc) · 2.88 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package xk8s
import (
"context"
"fmt"
"github.com/coder2z/g-saber/xlog"
"github.com/coder2z/g-server/xregistry"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"strings"
"sync"
)
type k8sDiscovery struct {
clients *kubernetes.Clientset
namespace string
closeOnce sync.Once
closeCh chan struct{}
}
func newDiscovery(namespace string) (xregistry.Discovery, error) {
if namespace == "" {
namespace = "default"
}
conf, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clients, err := kubernetes.NewForConfig(conf)
if err != nil {
return nil, err
}
return &k8sDiscovery{
clients: clients,
namespace: namespace,
closeCh: make(chan struct{}),
}, nil
}
// target: service-name:port or service-name:port-name
func (d *k8sDiscovery) Discover(target string) (<-chan []xregistry.Instance, error) {
service, port := parse(target)
if service == "" || port == "" {
return nil, fmt.Errorf("target not valid: %s", target)
}
ch := make(chan []xregistry.Instance)
return ch, d.watch(ch, service, port)
}
func parse(target string) (service, port string) {
ss := strings.Split(target, ":")
if len(ss) == 2 {
service, port = ss[0], ss[1]
}
return
}
func (d *k8sDiscovery) watch(ch chan<- []xregistry.Instance, service, port string) error {
watcher, err := d.clients.CoreV1().Endpoints(d.namespace).
Watch(context.Background(), metaV1.ListOptions{FieldSelector: fmt.Sprintf("%s=%s", "metadata.name", service)})
if err != nil {
return err
}
go func() {
for {
select {
case <-d.closeCh:
return
case <-watcher.ResultChan():
endpoints, err := d.clients.CoreV1().Endpoints(d.namespace).
List(context.Background(), metaV1.ListOptions{FieldSelector: fmt.Sprintf("%s=%s", "metadata.name", service)})
if err != nil {
continue
}
var i []xregistry.Instance
for _, endpoint := range endpoints.Items {
for _, subset := range endpoint.Subsets {
realPort := port
for _, p := range subset.Ports {
if p.Name == port {
realPort = fmt.Sprint(p.Port)
break
}
}
for _, addr := range subset.Addresses {
ins := xregistry.Instance{Address: fmt.Sprintf("%s:%s", addr.IP, realPort)}
i = append(i, ins)
}
xlog.Info("Application Running",
xlog.FieldComponentName("XRegistry"),
xlog.FieldMethod("XRegistry.XK8S.watch"),
xlog.FieldDescription("K8S discovery service : update server list success"),
xlog.Any("service name", service),
xlog.FieldValueAny(i),
)
}
}
ch <- i
}
}
}()
return nil
}
func (d *k8sDiscovery) Close() {
d.closeOnce.Do(func() {
xlog.Info("Application Stopping",
xlog.FieldComponentName("XRegistry"),
xlog.FieldMethod("XRegistry.XK8S.Close"),
xlog.FieldDescription("Service stopping,Registration cancellation"),
)
close(d.closeCh)
})
}