/
gclient_discovery.go
98 lines (89 loc) · 2.6 KB
/
gclient_discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gclient
import (
"context"
"net/http"
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/internal/intlog"
"github.com/gogf/gf/v2/net/gsel"
"github.com/gogf/gf/v2/net/gsvc"
)
type discoveryNode struct {
service gsvc.Service
address string
}
// Service is the client discovery service.
func (n *discoveryNode) Service() gsvc.Service {
return n.service
}
// Address returns the address of the node.
func (n *discoveryNode) Address() string {
return n.address
}
var clientSelectorMap = gmap.New(true)
// internalMiddlewareDiscovery is a client middleware that enables service discovery feature for client.
func internalMiddlewareDiscovery(c *Client, r *http.Request) (response *Response, err error) {
if c.discovery == nil {
return c.Next(r)
}
var (
ctx = r.Context()
service gsvc.Service
)
service, err = gsvc.GetAndWatchWithDiscovery(ctx, c.discovery, r.URL.Host, func(service gsvc.Service) {
intlog.Printf(ctx, `http client watching service "%s" changed`, service.GetPrefix())
if v := clientSelectorMap.Get(service.GetPrefix()); v != nil {
if err = updateSelectorNodesByService(ctx, v.(gsel.Selector), service); err != nil {
intlog.Errorf(ctx, `%+v`, err)
}
}
})
if err != nil {
return nil, err
}
if service == nil {
return c.Next(r)
}
// Balancer.
var (
selectorMapKey = service.GetPrefix()
selectorMapValue = clientSelectorMap.GetOrSetFuncLock(selectorMapKey, func() interface{} {
intlog.Printf(ctx, `http client create selector for service "%s"`, selectorMapKey)
selector := c.builder.Build()
// Update selector nodes.
if err = updateSelectorNodesByService(ctx, selector, service); err != nil {
return nil
}
return selector
})
)
if err != nil {
return nil, err
}
selector := selectorMapValue.(gsel.Selector)
// Pick one node from multiple addresses.
node, done, err := selector.Pick(ctx)
if err != nil {
return nil, err
}
if done != nil {
defer done(ctx, gsel.DoneInfo{})
}
r.Host = node.Address()
r.URL.Host = node.Address()
return c.Next(r)
}
func updateSelectorNodesByService(ctx context.Context, selector gsel.Selector, service gsvc.Service) error {
nodes := make(gsel.Nodes, 0)
for _, endpoint := range service.GetEndpoints() {
nodes = append(nodes, &discoveryNode{
service: service,
address: endpoint.String(),
})
}
return selector.Update(ctx, nodes)
}