Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(selector): add node scheme #1932

Merged
merged 3 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions selector/default_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@ import (

// DefaultNode is selector node
type DefaultNode struct {
scheme string
addr string
weight *int64
version string
name string
metadata map[string]string
}

// Scheme is node scheme
func (n *DefaultNode) Scheme() string {
return n.scheme
}

// Address is node address
func (n *DefaultNode) Address() string {
return n.addr
Expand All @@ -41,9 +47,10 @@ func (n *DefaultNode) Metadata() map[string]string {
}

// NewNode new node
func NewNode(addr string, ins *registry.ServiceInstance) Node {
func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node {
n := &DefaultNode{
addr: addr,
scheme: scheme,
addr: addr,
}
if ins != nil {
n.name = ins.Name
Expand Down
2 changes: 2 additions & 0 deletions selector/filter/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestVersion(t *testing.T) {
f := Version("v2.0.0")
var nodes []selector.Node
nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand All @@ -22,6 +23,7 @@ func TestVersion(t *testing.T) {
}))

nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.2:9090",
&registry.ServiceInstance{
ID: "127.0.0.2:9090",
Expand Down
2 changes: 2 additions & 0 deletions selector/node/direct/direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func TestDirect(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down Expand Up @@ -42,6 +43,7 @@ func TestDirect(t *testing.T) {
func TestDirectDefaultWeight(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down
3 changes: 3 additions & 0 deletions selector/node/ewma/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func TestDirect(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down Expand Up @@ -53,6 +54,7 @@ func TestDirect(t *testing.T) {
func TestDirectError(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down Expand Up @@ -89,6 +91,7 @@ func TestDirectErrorHandler(t *testing.T) {
},
}
wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down
2 changes: 2 additions & 0 deletions selector/p2c/p2c_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestWrr3(t *testing.T) {
for i := 0; i < 3; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode(
"http",
addr,
&registry.ServiceInstance{
ID: addr,
Expand Down Expand Up @@ -96,6 +97,7 @@ func TestOne(t *testing.T) {
for i := 0; i < 1; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode(
"http",
addr,
&registry.ServiceInstance{
ID: addr,
Expand Down
2 changes: 2 additions & 0 deletions selector/random/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ func TestWrr(t *testing.T) {
random := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node
nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:8080",
&registry.ServiceInstance{
ID: "127.0.0.1:8080",
Version: "v2.0.0",
Metadata: map[string]string{"weight": "10"},
}))
nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down
3 changes: 3 additions & 0 deletions selector/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Builder interface {

// Node is node interface.
type Node interface {
// Scheme is service node scheme
Scheme() string

// Address is the unique address under the same service
Address() string

Expand Down
2 changes: 2 additions & 0 deletions selector/selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestDefault(t *testing.T) {
selector := builder.Build()
var nodes []Node
nodes = append(nodes, NewNode(
"http",
"127.0.0.1:8080",
&registry.ServiceInstance{
ID: "127.0.0.1:8080",
Expand All @@ -98,6 +99,7 @@ func TestDefault(t *testing.T) {
Metadata: map[string]string{"weight": "10"},
}))
nodes = append(nodes, NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down
2 changes: 2 additions & 0 deletions selector/wrr/wrr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ func TestWrr(t *testing.T) {
wrr := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node
nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:8080",
&registry.ServiceInstance{
ID: "127.0.0.1:8080",
Version: "v2.0.0",
Metadata: map[string]string{"weight": "10"},
}))
nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Expand Down
2 changes: 1 addition & 1 deletion transport/grpc/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
for conn, info := range info.ReadySCs {
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
nodes = append(nodes, &grpcNode{
Node: selector.NewNode(info.Address.Addr, ins),
Node: selector.NewNode("grpc", info.Address.Addr, ins),
subConn: conn,
})
}
Expand Down
2 changes: 1 addition & 1 deletion transport/http/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (r *resolver) update(services []*registry.ServiceInstance) bool {
if ept == "" {
continue
}
nodes = append(nodes, selector.NewNode(ept, ins))
nodes = append(nodes, selector.NewNode("http", ept, ins))
}
if len(nodes) == 0 {
r.logger.Warnf("[http resolver]Zero endpoint found,refused to write,set: %s ins: %v", r.target.Endpoint, nodes)
Expand Down