Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: PS: Replace balancer with upstream grpc solution #12706

Merged
merged 1 commit into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
<hr>


## v3.5.0 (2020 TBD)
## v3.5.0 (2021 TBD)

See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_5.md) for any breaking changes.

Expand Down Expand Up @@ -63,6 +63,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
- Changed `pkg/flags` function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11616).
- Previously, `SetFlagsFromEnv(prefix string, fs *flag.FlagSet) error`, now `SetFlagsFromEnv(lg *zap.Logger, prefix string, fs *flag.FlagSet) error`.
- Previously, `SetPflagsFromEnv(prefix string, fs *pflag.FlagSet) error`, now `SetPflagsFromEnv(lg *zap.Logger, prefix string, fs *pflag.FlagSet) error`.
- ClientV3 supports [grpc resolver API](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go).
- Endpoints can be managed using [endpoints.Manager](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/endpoints/endpoints.go)
- Previously supported [GRPCResolver was decomissioned](https://github.com/etcd-io/etcd/pull/12675). Use [resolver](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go) instead.


### `etcdctl`

Expand Down Expand Up @@ -174,6 +178,8 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896).
- Fix [auth token invalid after watch reconnects](https://github.com/etcd-io/etcd/pull/12264). Get AuthToken automatically when clientConn is ready.
- Improve [clientv3:get AuthToken gracefully without extra connection](https://github.com/etcd-io/etcd/pull/12165).
- Changed [clientv3 dialing code](https://github.com/etcd-io/etcd/pull/12671) to use grpc resolver API instead of custom balancer.
- Endpoints self identify now as `etcd-endpoints://{id}/#initially={list of endpoints}` e.g. `etcd-endpoints://0xc0009d8540/#initially=[localhost:2079]`

### Package `lease`

Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ test:
$(TEST_OPTS) ./test.sh 2>&1 | tee test-$(TEST_SUFFIX).log
! egrep "(--- FAIL:|DATA RACE|panic: test timed out|appears to have leaked)" -B50 -A10 test-$(TEST_SUFFIX).log

test-small:
$(info log-file: test-$(TEST_SUFFIX).log)
PASSES="fmt build unit" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log

test-full:
$(info log-file: test-$(TEST_SUFFIX).log)
PASSES="fmt build unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log

docker-test:
$(info GO_VERSION: $(GO_VERSION))
$(info ETCD_VERSION: $(ETCD_VERSION))
Expand Down
37 changes: 21 additions & 16 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,10 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
} else {
opts = append(opts, grpc.WithInsecure())
}
grpc.WithDisableRetry()

// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
// once it is available.
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
Expand Down Expand Up @@ -252,8 +250,8 @@ func (c *Client) getToken(ctx context.Context) error {

// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(ep)
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(c.Endpoints()[0])
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
Expand All @@ -278,21 +276,30 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
if c.creds != nil {
r := endpoint.RequiresCredentials(ep)
switch r {
case endpoint.CREDS_DROP:
return nil
case endpoint.CREDS_OPTIONAL:
return c.creds
}
if endpoint.RequiresCredentials(ep) {
case endpoint.CREDS_REQUIRE:
if c.creds != nil {
return c.creds
}
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
panic(fmt.Errorf("Unsupported CredsRequirement: %v", r))
}
ptabor marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func newClient(cfg *Config) (*Client, error) {
Expand Down Expand Up @@ -360,18 +367,15 @@ func newClient(cfg *Config) (*Client, error) {
client.cancel()
return nil, fmt.Errorf("at least one Endpoint is required in client config")
}
dialEndpoint := cfg.Endpoints[0]

// Use a provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dialWithBalancer(dialEndpoint)
conn, err := client.dialWithBalancer()
if err != nil {
client.cancel()
client.resolver.Close()
// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
client.conn = conn

client.Cluster = NewCluster(client)
Expand All @@ -390,6 +394,7 @@ func newClient(cfg *Config) (*Client, error) {
if err != nil {
client.Close()
cancel()
//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
return nil, err
}
cancel()
Expand Down
6 changes: 4 additions & 2 deletions client/v3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func TestDialCancel(t *testing.T) {
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)

wantError := context.DeadlineExceeded

// grpc.WithBlock to block until connection up or timeout
testCfgs := []Config{
{
Expand Down Expand Up @@ -121,8 +123,8 @@ func TestDialTimeout(t *testing.T) {
case <-time.After(5 * time.Second):
t.Errorf("#%d: failed to timeout dial on time", i)
case err := <-donec:
if err != context.DeadlineExceeded {
t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
if err.Error() != wantError.Error() {
t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
}
}
}
Expand Down
117 changes: 93 additions & 24 deletions client/v3/internal/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,114 @@
package endpoint

import (
"fmt"
"net"
"net/url"
"regexp"
"path"
"strings"
)

var (
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
type CredsRequirement int

const (
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
CREDS_REQUIRE CredsRequirement = iota
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
CREDS_DROP
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
CREDS_OPTIONAL
)

func stripPort(ep string) string {
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
func extractHostFromHostPort(ep string) string {
host, _, err := net.SplitHostPort(ep)
if err != nil {
return ep
}
return host
}

func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
url, err := url.Parse(ep)
if err != nil {
return ep, stripPort(ep), false
func extractHostFromPath(pathStr string) string {
return extractHostFromHostPort(path.Base(pathStr))
}

//mustSplit2 returns the values from strings.SplitN(s, sep, 2).
//If sep is not found, it returns ("", "", false) instead.
func mustSplit2(s, sep string) (string, string) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
panic(fmt.Errorf("Token '%v' expected to have separator sep: `%v`", s, sep))
}
switch url.Scheme {
case "http", "https":
return url.Host, url.Hostname(), url.Scheme == "https"
case "unix", "unixs":
requireCreds = url.Scheme == "unixs"
if url.Opaque != "" {
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
} else if url.Path != "" {
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
} else {
return "unix:" + url.Host, url.Hostname(), requireCreds
}
return spl[0], spl[1]
}

func schemeToCredsRequirement(schema string) CredsRequirement {
switch schema {
case "https", "unixs":
return CREDS_REQUIRE
case "http":
return CREDS_DROP
case "unix":
// Preserving previous behavior from:
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
// that likely was a bug due to missing 'fallthrough'.
// At the same time it seems legit to let the users decide whether they
// want credential control or not (and 'unixs' schema is not a standard thing).
return CREDS_OPTIONAL
case "":
return url.Host + url.Path, url.Host + url.Path, false
return CREDS_OPTIONAL
default:
return ep, stripPort(ep), false
return CREDS_OPTIONAL
}
}

// This function translates endpoints names supported by etcd server into
// endpoints as supported by grpc with additional information
// (server_name for cert validation, requireCreds - whether certs are needed).
// The main differences:
// - etcd supports unixs & https names as opposed to unix & http to
// distinguish need to configure certificates.
// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
// - etcd supports unix(s)://local-file naming schema
// (as opposed to unix:local-file canonical name used by grpc for current dir files).
// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
// is considered serverName - to allow local testing of cert-protected communication.
// See more:
// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
// - https://golang.org/pkg/net/#Dial
// - https://github.com/grpc/grpc/blob/master/doc/naming.md
func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) {
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
// absolute path case
schema, absolutePath := mustSplit2(ep, "://")
return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema)
}
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
// legacy etcd local path
schema, localPath := mustSplit2(ep, "://")
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
}
schema, localPath := mustSplit2(ep, ":")
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
}

if strings.Contains(ep, "://") {
url, err := url.Parse(ep)
if err != nil {
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
}
if url.Scheme == "http" || url.Scheme == "https" {
return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme)
}
return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme)
}
// Handles plain addresses like 10.0.0.44:437.
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
}

// RequiresCredentials returns whether given endpoint requires
// credentials/certificates for connection.
func RequiresCredentials(ep string) bool {
func RequiresCredentials(ep string) CredsRequirement {
_, _, requireCreds := translateEndpoint(ep)
return requireCreds
}
Expand Down
Loading