Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113934: roachprod: use gcloud CLI instead of net.LookupSRV r=renatolabs a=herkolategan

Previously `net.LookupSRV` with a custom resolver was used to lookup DNS
records. This approach resulted in several flakes and required waiting on DNS
servers to have the records available. The CLI is more stable, but has a greater
call overhead.

This PR also introduces a cache to reduce the cost of the `LookupSRVRecords`
call which could be called frequently depending on the origin of use. The cache
is updated for any CRUD operations on the DNS entries, and a call to the CLI
will not occur if any entry exists for the name the lookup is attempting. The
names are also normalised to remove a trailing dot in order to make matching
against the cache work correctly.

There is a small risk that the cache could go out of sync if any other roachprod
process manipulates the records with a create, update or destroy operation,
while a continuous roachprod process is interacting with the entries. This risk
is relatively small and usually applies to roachtest rather than everyday use of
roachprod.

Fixes #111269

Epic: None
Release Note: None


113996: upgrade: use high priority txn's to update the cluster version r=fqazi a=fqazi

Previously, it was possible for the leasing subsystem to starve out attempts to set the cluster version during upgrades, since the leasing subsystem uses high priority txn for renewals. To address this, this patch makes the logic to set the cluster version high priority so it can't be pushed out by lease renewals.

Fixes: #113908

Release note (bug fix): Addressed a bug that could cause cluster version finalization to get starved out by descriptor lease renewals on larger clusters.

Co-authored-by: Herko Lategan <herko@cockroachlabs.com>
Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
  • Loading branch information
3 people committed Nov 17, 2023
3 parents 7f00287 + de5d867 + 07f3628 commit 8d026f9
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 119 deletions.
3 changes: 1 addition & 2 deletions pkg/roachprod/install/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ func (c *SyncedCluster) DiscoverServices(
mu := syncutil.Mutex{}
records := make([]vm.DNSRecord, 0)
err := vm.FanOutDNS(c.VMs, func(dnsProvider vm.DNSProvider, _ vm.List) error {
service := fmt.Sprintf("%s-%s", virtualClusterName, string(serviceType))
r, lookupErr := dnsProvider.LookupSRVRecords(ctx, service, "tcp", c.Name)
r, lookupErr := dnsProvider.LookupSRVRecords(ctx, serviceDNSName(dnsProvider, virtualClusterName, serviceType, c.Name))
if lookupErr != nil {
return lookupErr
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachprod/vm/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type DNSProvider interface {
// subdomain. The protocol is usually "tcp" and the subdomain is usually the
// cluster name. The service is a combination of the virtual cluster name and
// type of service.
LookupSRVRecords(ctx context.Context, service, proto, subdomain string) ([]DNSRecord, error)
LookupSRVRecords(ctx context.Context, name string) ([]DNSRecord, error)
// ListRecords lists all DNS records managed for the zone.
ListRecords(ctx context.Context) ([]DNSRecord, error)
// DeleteRecordsBySubdomain deletes all DNS records with the given subdomain.
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachprod/vm/gce/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ go_library(
"//pkg/roachprod/logger",
"//pkg/roachprod/vm",
"//pkg/roachprod/vm/flagstub",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_spf13_pflag//:pflag",
Expand Down
146 changes: 58 additions & 88 deletions pkg/roachprod/vm/gce/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"os/exec"
"sort"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
Expand All @@ -31,7 +29,6 @@ import (
const (
dnsManagedZone = "roachprod-managed"
dnsDomain = "roachprod-managed.crdb.io"
dnsServer = "ns-cloud-a1.googledomains.com"
dnsMaxResults = 1000
dnsMaxConcurrentRequests = 4
)
Expand All @@ -42,34 +39,35 @@ var _ vm.DNSProvider = &dnsProvider{}

// dnsProvider implements the vm.DNSProvider interface.
type dnsProvider struct {
resolver *net.Resolver
recordsCache struct {
mu syncutil.Mutex
records map[string][]vm.DNSRecord
}
}

func NewDNSProvider() vm.DNSProvider {
resolver := new(net.Resolver)
resolver.StrictErrors = true
resolver.Dial = func(ctx context.Context, network, address string) (net.Conn, error) {
dialer := net.Dialer{}
// Prefer TCP over UDP. This is necessary because the DNS server
// will return a truncated response if the response is too large
// for a UDP packet, resulting in a "server misbehaving" error.
return dialer.DialContext(ctx, "tcp", dnsServer+":53")
return &dnsProvider{
recordsCache: struct {
mu syncutil.Mutex
records map[string][]vm.DNSRecord
}{records: make(map[string][]vm.DNSRecord)},
}
return &dnsProvider{resolver: resolver}
}

// CreateRecords implements the vm.DNSProvider interface.
func (n *dnsProvider) CreateRecords(ctx context.Context, records ...vm.DNSRecord) error {
recordsByName := make(map[string][]vm.DNSRecord)
for _, record := range records {
// Ensure we use the normalised name for grouping records.
record.Name = n.normaliseName(record.Name)
recordsByName[record.Name] = append(recordsByName[record.Name], record)
}

for name, recordGroup := range recordsByName {
// No need to break the name down into components as the lookup command
// accepts a fully qualified name as the last parameter if the service and
// proto parameters are empty strings.
existingRecords, err := n.lookupSRVRecords(ctx, "", "", name)
existingRecords, err := n.lookupSRVRecords(ctx, name)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,20 +101,14 @@ func (n *dnsProvider) CreateRecords(ctx context.Context, records ...vm.DNSRecord
if err != nil {
return markDNSOperationError(errors.Wrapf(err, "output: %s", out))
}
n.updateCache(name, recordGroup)
}
// The DNS records are not immediately available after creation. We wait until
// they are available before returning. This is necessary because the records
// are required for starting servers. The waiting period should usually be
// short (less than 30 seconds).
return n.waitForRecordsAvailable(ctx, records...)
return nil
}

// LookupSRVRecords implements the vm.DNSProvider interface.
func (n *dnsProvider) LookupSRVRecords(
ctx context.Context, service, proto, subdomain string,
) ([]vm.DNSRecord, error) {
name := fmt.Sprintf(`%s.%s`, subdomain, n.Domain())
return n.lookupSRVRecords(ctx, service, proto, name)
func (n *dnsProvider) LookupSRVRecords(ctx context.Context, name string) ([]vm.DNSRecord, error) {
return n.lookupSRVRecords(ctx, name)
}

// ListRecords implements the vm.DNSProvider interface.
Expand All @@ -141,6 +133,7 @@ func (n *dnsProvider) DeleteRecordsByName(ctx context.Context, names ...string)
if err != nil {
return markDNSOperationError(errors.Wrapf(err, "output: %s", out))
}
n.clearCacheEntry(name)
return nil
})
}
Expand Down Expand Up @@ -181,34 +174,27 @@ func (n *dnsProvider) Domain() string {
// network problems. For lookups, we prefer this to using the gcloud command as
// it is faster, and preferable when service information is being queried
// regularly.
func (n *dnsProvider) lookupSRVRecords(
ctx context.Context, service, proto, name string,
) ([]vm.DNSRecord, error) {
var err error
var cName string
var srvRecords []*net.SRV
err = retry.WithMaxAttempts(ctx, retry.Options{}, 10, func() error {
cName, srvRecords, err = n.resolver.LookupSRV(ctx, service, proto, name)
if dnsError := (*net.DNSError)(nil); errors.As(err, &dnsError) {
// We ignore some errors here as they are likely due to the record name not
// existing. The net.LookupSRV function tends to return "server misbehaving"
// and "no such host" errors when no record entries are found. Hence, making
// the errors ambiguous and not useful. The errors are not exported, so we
// have to check the error message.
if dnsError.Err != "server misbehaving" && dnsError.Err != "no such host" && !dnsError.IsNotFound {
return markDNSOperationError(dnsError)
}
}
return nil
})
func (n *dnsProvider) lookupSRVRecords(ctx context.Context, name string) ([]vm.DNSRecord, error) {
// Check the cache first.
if cachedRecords, ok := n.getCache(name); ok {
return cachedRecords, nil
}
// Lookup the records, if no records are found in the cache.
records, err := n.listSRVRecords(ctx, name, dnsMaxResults)
filteredRecords := make([]vm.DNSRecord, 0, len(records))
if err != nil {
return nil, err
}
records := make([]vm.DNSRecord, len(srvRecords))
for i, srvRecord := range srvRecords {
records[i] = vm.CreateSRVRecord(cName, *srvRecord)
for _, record := range records {
// Filter out records that do not match the full normalised target name.
// This is necessary because the gcloud command does partial matching.
if n.normaliseName(record.Name) != n.normaliseName(name) {
continue
}
filteredRecords = append(filteredRecords, record)
}
return records, nil
n.updateCache(name, filteredRecords)
return filteredRecords, nil
}

// listSRVRecords returns all SRV records that match the given filter from Google Cloud DNS.
Expand Down Expand Up @@ -259,46 +245,30 @@ func (n *dnsProvider) listSRVRecords(
return records, nil
}

// waitForRecordsAvailable waits for the DNS records to become available on the
// DNS server through a standard net tools lookup.
func (n *dnsProvider) waitForRecordsAvailable(ctx context.Context, records ...vm.DNSRecord) error {
type recordKey struct {
name string
data string
}
trimName := func(name string) string {
return strings.TrimSuffix(name, ".")
}
notAvailable := make(map[recordKey]struct{})
for _, record := range records {
notAvailable[recordKey{
name: trimName(record.Name),
data: record.Data,
}] = struct{}{}
}
func (n *dnsProvider) updateCache(name string, records []vm.DNSRecord) {
n.recordsCache.mu.Lock()
defer n.recordsCache.mu.Unlock()
n.recordsCache.records[n.normaliseName(name)] = records
}

for attempts := 0; attempts < 30; attempts++ {
for key := range notAvailable {
foundRecords, err := n.lookupSRVRecords(ctx, "", "", key.name)
if err != nil {
return err
}
for _, foundRecord := range foundRecords {
delete(notAvailable, recordKey{
name: trimName(foundRecord.Name),
data: foundRecord.Data,
})
}
}
if len(notAvailable) == 0 {
return nil
}
time.Sleep(2 * time.Second)
}
return markDNSOperationError(
errors.Newf("waiting for DNS records to become available: %d out of %d records not available",
len(notAvailable), len(records)),
)
func (n *dnsProvider) getCache(name string) ([]vm.DNSRecord, bool) {
n.recordsCache.mu.Lock()
defer n.recordsCache.mu.Unlock()
records, ok := n.recordsCache.records[n.normaliseName(name)]
return records, ok
}

func (n *dnsProvider) clearCacheEntry(name string) {
n.recordsCache.mu.Lock()
defer n.recordsCache.mu.Unlock()
delete(n.recordsCache.records, n.normaliseName(name))
}

// normaliseName removes the trailing dot from a DNS name if it exists.
// This is necessary because depending on where the name originates from, it
// may or may not have a trailing dot.
func (n *dnsProvider) normaliseName(name string) string {
return strings.TrimSuffix(name, ".")
}

// markDNSOperationError should be used to mark any external DNS API or Google
Expand Down
5 changes: 1 addition & 4 deletions pkg/roachprod/vm/local/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,11 @@ func (n *dnsProvider) CreateRecords(_ context.Context, records ...vm.DNSRecord)
}

// LookupSRVRecords is part of the vm.DNSProvider interface.
func (n *dnsProvider) LookupSRVRecords(
_ context.Context, service, proto, subdomain string,
) ([]vm.DNSRecord, error) {
func (n *dnsProvider) LookupSRVRecords(_ context.Context, name string) ([]vm.DNSRecord, error) {
records, err := n.loadRecords()
if err != nil {
return nil, err
}
name := fmt.Sprintf("_%s._%s.%s.%s", service, proto, subdomain, n.Domain())
var matchingRecords []vm.DNSRecord
for _, record := range records {
if record.Name == name && record.Type == vm.SRV {
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachprod/vm/local/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestLookupRecords(t *testing.T) {
}...)

t.Run("lookup system", func(t *testing.T) {
records, err := p.LookupSRVRecords(ctx, "system-sql", "tcp", "local")
records, err := p.LookupSRVRecords(ctx, "_system-sql._tcp.local.local-zone")
require.NoError(t, err)
require.Equal(t, 3, len(records))
for _, r := range records {
Expand All @@ -63,7 +63,7 @@ func TestLookupRecords(t *testing.T) {
})

t.Run("parse SRV data", func(t *testing.T) {
records, err := p.LookupSRVRecords(ctx, "tenant-1-sql", "tcp", "local")
records, err := p.LookupSRVRecords(ctx, "_tenant-1-sql._tcp.local.local-zone")
require.NoError(t, err)
require.Equal(t, 1, len(records))
data, err := records[0].ParseSRVRecord()
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/settingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"overrides.go",
"row_decoder.go",
"setting_encoder.go",
"settings_watcher.go",
"version_guard.go",
],
Expand All @@ -22,6 +23,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
Expand All @@ -34,6 +36,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -43,6 +46,7 @@ go_test(
srcs = [
"main_test.go",
"row_decoder_external_test.go",
"setting_encoder_test.go",
"settings_watcher_external_test.go",
"version_guard_test.go",
],
Expand Down Expand Up @@ -74,6 +78,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
60 changes: 60 additions & 0 deletions pkg/server/settingswatcher/setting_encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package settingswatcher

import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// EncodeSettingKey encodes a key for the system.settings table, which
// can be used for direct KV operations.
func EncodeSettingKey(codec keys.SQLCodec, setting string) []byte {
indexPrefix := codec.IndexPrefix(keys.SettingsTableID, uint32(1))
return encoding.EncodeUvarintAscending(encoding.EncodeStringAscending(indexPrefix, setting), uint64(0))
}

// EncodeSettingValue encodes a value for the system.settings table, which
// can be used for direct KV operations.
func EncodeSettingValue(rawValue []byte, valueType string) ([]byte, error) {
// Encode the setting value to write out the updated version.
var tuple []byte
var err error
if tuple, err = valueside.Encode(tuple,
valueside.MakeColumnIDDelta(descpb.ColumnID(encoding.NoColumnID),
systemschema.SettingsTable.PublicColumns()[1].GetID()),
tree.NewDString(string(rawValue)),
nil); err != nil {
return nil, err
}
if tuple, err = valueside.Encode(tuple,
valueside.MakeColumnIDDelta(systemschema.SettingsTable.PublicColumns()[1].GetID(),
systemschema.SettingsTable.PublicColumns()[2].GetID()),
tree.MustMakeDTimestamp(timeutil.Now(), time.Microsecond),
nil); err != nil {
return nil, err
}
if tuple, err = valueside.Encode(tuple,
valueside.MakeColumnIDDelta(systemschema.SettingsTable.PublicColumns()[2].GetID(),
systemschema.SettingsTable.PublicColumns()[3].GetID()),
tree.NewDString(valueType),
nil); err != nil {
return nil, err
}
return tuple, nil
}

0 comments on commit 8d026f9

Please sign in to comment.