Skip to content

Commit

Permalink
[3.4] backport #12706 clientv3: PS: Replace balancer with upstream gr…
Browse files Browse the repository at this point in the history
…pc solution

Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Oct 30, 2023
1 parent 6e6753c commit 69a2c6f
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 68 deletions.
36 changes: 21 additions & 15 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,10 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
} else {
opts = append(opts, grpc.WithInsecure())
}
grpc.WithDisableRetry()

// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
// once it is available.
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
Expand Down Expand Up @@ -255,8 +253,8 @@ func (c *Client) getToken(ctx context.Context) error {

// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(ep)
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(c.Endpoints()[0])
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
Expand All @@ -281,21 +279,30 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
if c.creds != nil {
r := endpoint.RequiresCredentials(ep)
switch r {
case endpoint.CREDS_DROP:
return nil
case endpoint.CREDS_OPTIONAL:
return c.creds
}
if endpoint.RequiresCredentials(ep) {
case endpoint.CREDS_REQUIRE:
if c.creds != nil {
return c.creds
}
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
panic(fmt.Errorf("Unsupported CredsRequirement: %v", r))
}
return nil
}

func newClient(cfg *Config) (*Client, error) {
Expand Down Expand Up @@ -362,18 +369,16 @@ func newClient(cfg *Config) (*Client, error) {
if len(cfg.Endpoints) < 1 {
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
}
dialEndpoint := cfg.Endpoints[0]

// Use a provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dialWithBalancer(dialEndpoint)
conn, err := client.dialWithBalancer()
if err != nil {
client.cancel()
client.resolver.Close()
// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
client.conn = conn

client.Cluster = NewCluster(client)
Expand All @@ -392,6 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
if err != nil {
client.Close()
cancel()
//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
return nil, err
}
cancel()
Expand Down
6 changes: 4 additions & 2 deletions clientv3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func TestDialCancel(t *testing.T) {
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)

wantError := context.DeadlineExceeded

// grpc.WithBlock to block until connection up or timeout
testCfgs := []Config{
{
Expand Down Expand Up @@ -122,8 +124,8 @@ func TestDialTimeout(t *testing.T) {
case <-time.After(5 * time.Second):
t.Errorf("#%d: failed to timeout dial on time", i)
case err := <-donec:
if err != context.DeadlineExceeded {
t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
if err.Error() != wantError.Error() {
t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
}
}
}
Expand Down
118 changes: 94 additions & 24 deletions clientv3/internal/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,115 @@
package endpoint

import (
"fmt"
"net"
"net/url"
"regexp"
"path"
"strings"
)

var (
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
type CredsRequirement int

const (
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
CREDS_REQUIRE CredsRequirement = iota
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
CREDS_DROP
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
CREDS_OPTIONAL
)

func stripPort(ep string) string {
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
func extractHostFromHostPort(ep string) string {
host, _, err := net.SplitHostPort(ep)
if err != nil {
return ep
}
return host
}

func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
url, err := url.Parse(ep)
if err != nil {
return ep, stripPort(ep), false
func extractHostFromPath(pathStr string) string {
return extractHostFromHostPort(path.Base(pathStr))
}

// mustSplit2 returns the values from strings.SplitN(s, sep, 2).
// If sep is not found, it returns ("", "", false) instead.
func mustSplit2(s, sep string) (string, string) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
panic(fmt.Errorf("Token '%v' expected to have separator sep: `%v`", s, sep))
}
switch url.Scheme {
case "http", "https":
return url.Host, url.Hostname(), url.Scheme == "https"
case "unix", "unixs":
requireCreds = url.Scheme == "unixs"
if url.Opaque != "" {
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
} else if url.Path != "" {
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
} else {
return "unix:" + url.Host, url.Hostname(), requireCreds
}
return spl[0], spl[1]
}

func schemeToCredsRequirement(schema string) CredsRequirement {
switch schema {
case "https", "unixs":
return CREDS_REQUIRE
case "http":
return CREDS_DROP
case "unix":
// Preserving previous behavior from:
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
// that likely was a bug due to missing 'fallthrough'.
// At the same time it seems legit to let the users decide whether they
// want credential control or not (and 'unixs' schema is not a standard thing).
return CREDS_OPTIONAL
case "":
return url.Host + url.Path, url.Host + url.Path, false
return CREDS_OPTIONAL
default:
return ep, stripPort(ep), false
return CREDS_OPTIONAL
}
}

// This function translates endpoints names supported by etcd server into
// endpoints as supported by grpc with additional information
// (server_name for cert validation, requireCreds - whether certs are needed).
// The main differences:
// - etcd supports unixs & https names as opposed to unix & http to
// distinguish need to configure certificates.
// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
// - etcd supports unix(s)://local-file naming schema
// (as opposed to unix:local-file canonical name used by grpc for current dir files).
// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
// is considered serverName - to allow local testing of cert-protected communication.
//
// See more:
// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
// - https://golang.org/pkg/net/#Dial
// - https://github.com/grpc/grpc/blob/master/doc/naming.md
func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) {
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
// absolute path case
schema, absolutePath := mustSplit2(ep, "://")
return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema)
}
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
// legacy etcd local path
schema, localPath := mustSplit2(ep, "://")
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
}
schema, localPath := mustSplit2(ep, ":")
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
}

if strings.Contains(ep, "://") {
url, err := url.Parse(ep)
if err != nil {
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
}
if url.Scheme == "http" || url.Scheme == "https" {
return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme)
}
return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme)
}
// Handles plain addresses like 10.0.0.44:437.
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
}

// RequiresCredentials returns whether given endpoint requires
// credentials/certificates for connection.
func RequiresCredentials(ep string) bool {
func RequiresCredentials(ep string) CredsRequirement {
_, _, requireCreds := translateEndpoint(ep)
return requireCreds
}
Expand Down
86 changes: 60 additions & 26 deletions clientv3/internal/endpoint/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,48 @@ import (
"testing"
)

func TestInterpret(t *testing.T) {
func Test_interpret(t *testing.T) {
tests := []struct {
endpoint string
wantAddress string
wantServerName string
endpoint string
wantAddress string
wantServerName string
wantRequiresCreds CredsRequirement
}{
{"127.0.0.1", "127.0.0.1", "127.0.0.1"},
{"localhost", "localhost", "localhost"},
{"localhost:8080", "localhost:8080", "localhost"},
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
{"localhost", "localhost", "localhost", CREDS_OPTIONAL},
{"localhost:8080", "localhost:8080", "localhost", CREDS_OPTIONAL},

{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},

{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},

{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},

{"http://127.0.0.1", "127.0.0.1", "127.0.0.1"},
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1"},
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
{"https://localhost:20000", "localhost:20000", "localhost"},
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP},
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_DROP},
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
{"https://localhost:20000", "localhost:20000", "localhost", CREDS_REQUIRE},

{"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
{"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
{"etcd.io", "etcd.io", "etcd.io"},
{"http://etcd.io/abc", "etcd.io", "etcd.io"},
{"dns://something-other", "dns://something-other", "dns://something-other"},
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL},
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE},
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_OPTIONAL},
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_REQUIRE},
{"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL},
{"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP},
{"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL},

{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_DROP},
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_OPTIONAL},
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
gotAddress, gotServerName := Interpret(tt.endpoint)
if gotAddress != tt.wantAddress {
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
Expand All @@ -61,5 +68,32 @@ func TestInterpret(t *testing.T) {
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
}
})
t.Run("RequiresCredentials_"+tt.endpoint, func(t *testing.T) {
requiresCreds := RequiresCredentials(tt.endpoint)
if requiresCreds != tt.wantRequiresCreds {
t.Errorf("RequiresCredentials() got = %v, want %v", requiresCreds, tt.wantRequiresCreds)
}
})
}
}

func Test_extractHostFromHostPort(t *testing.T) {
tests := []struct {
ep string
want string
}{
{ep: "localhost", want: "localhost"},
{ep: "localhost:8080", want: "localhost"},
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
{ep: "[2001:db8:1f70::999:de8:7648:6e8]", want: "[2001:db8:1f70::999:de8:7648:6e8]"},
{ep: "[2001:db8:1f70::999:de8:7648:6e8]:100", want: "2001:db8:1f70::999:de8:7648:6e8"},
}
for _, tt := range tests {
t.Run(tt.ep, func(t *testing.T) {
if got := extractHostFromHostPort(tt.ep); got != tt.want {
t.Errorf("extractHostFromHostPort() = %v, want %v", got, tt.want)
}
})
}
}
6 changes: 5 additions & 1 deletion clientv3/internal/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"google.golang.org/grpc/serviceconfig"
)

const (
Schema = "etcd-endpoints"
)

// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
// using SetEndpoints.
type EtcdManualResolver struct {
Expand All @@ -31,7 +35,7 @@ type EtcdManualResolver struct {
}

func New(endpoints ...string) *EtcdManualResolver {
r := manual.NewBuilderWithScheme("etcd-endpoints")
r := manual.NewBuilderWithScheme(Schema)
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
}

Expand Down

0 comments on commit 69a2c6f

Please sign in to comment.