forked from tencentyun/tsf-go
/
multi.go
64 lines (57 loc) · 1.85 KB
/
multi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package multi
import (
"context"
"fmt"
"sync"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/http/balancer"
"github.com/openzipkin/zipkin-go"
tBalancer "github.com/bjxujiang/tsf-go/balancer"
"github.com/bjxujiang/tsf-go/log"
"github.com/bjxujiang/tsf-go/naming"
"github.com/bjxujiang/tsf-go/pkg/meta"
"github.com/bjxujiang/tsf-go/route"
)
type Balancer struct {
r route.Router //路由&泳道
b tBalancer.Balancer
lock sync.RWMutex
nodes []naming.Instance
}
func New(router route.Router, b tBalancer.Balancer) *Balancer {
return &Balancer{
r: router, b: b,
}
}
func (b *Balancer) Pick(ctx context.Context) (node *registry.ServiceInstance, done func(context.Context, balancer.DoneInfo), err error) {
b.lock.RLock()
nodes := b.nodes
b.lock.RUnlock()
svc := naming.NewService(meta.Sys(ctx, meta.DestKey(meta.ServiceName)).(string), meta.Sys(ctx, meta.DestKey(meta.ServiceNamespace)).(string))
if len(nodes) == 0 {
log.DefaultLog.Errorf("picker: ErrNoSubConnAvailable! %s", svc.Name)
return nil, nil, fmt.Errorf("no instances avaiable")
}
log.DefaultLog.Debugw("msg", "picker pick", "svc", svc, "nodes", nodes)
filters := b.r.Select(ctx, *svc, nodes)
if len(filters) == 0 {
log.DefaultLog.Errorf("picker: ErrNoSubConnAvailable after route filter! %s", svc.Name)
return nil, nil, fmt.Errorf("no instances avaiable")
}
ins, _ := b.b.Pick(ctx, filters)
span := zipkin.SpanFromContext(ctx)
if span != nil {
ep, _ := zipkin.NewEndpoint(ins.Service.Name, ins.Addr())
span.SetRemoteEndpoint(ep)
}
return ins.ToKratosInstance(), func(context.Context, balancer.DoneInfo) {}, nil
}
func (b *Balancer) Update(nodes []*registry.ServiceInstance) {
b.lock.Lock()
defer b.lock.Unlock()
var inss []naming.Instance
for _, node := range nodes {
inss = append(inss, *naming.FromKratosInstance(node)[0])
}
b.nodes = inss
}