Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: support WAL failover to an explicit path #120783

Merged
merged 1 commit into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/base/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/uuid",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_davecgh_go_spew//spew",
"@com_github_stretchr_testify//require",
Expand Down
124 changes: 112 additions & 12 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"net"
"net/url"
"os"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -838,8 +839,7 @@ type TempStorageConfig struct {
Settings *cluster.Settings
}

// WALFailoverMode configures a node's stores behavior under high write latency
// to their write-ahead logs.
// WALFailoverMode specifies the mode of WAL failover.
type WALFailoverMode int8

const (
Expand All @@ -858,11 +858,11 @@ const (
// volume, the batch commit latency is insulated from the effects of momentary
// disk stalls.
WALFailoverAmongStores
// WALFailoverExplicitPath enables WAL failover for a single-store node to an
// explicitly specified path.
WALFailoverExplicitPath
)

// Type implements the pflag.Value interface.
func (m *WALFailoverMode) Type() string { return "string" }

// String implements fmt.Stringer.
func (m *WALFailoverMode) String() string {
return redact.StringWithoutMarkers(m)
Expand All @@ -877,25 +877,125 @@ func (m *WALFailoverMode) SafeFormat(p redact.SafePrinter, _ rune) {
p.SafeString("disabled")
case WALFailoverAmongStores:
p.SafeString("among-stores")
case WALFailoverExplicitPath:
p.SafeString("path")
default:
p.Printf("<unknown WALFailoverMode %d>", int8(*m))
}
}

// WALFailoverConfig configures a node's stores behavior under high write
// latency to their write-ahead logs.
type WALFailoverConfig struct {
Mode WALFailoverMode
// Path is the non-store path to which WALs should be written when failing
// over. It must be nonempty if and only if Mode == WALFailoverExplicitPath.
Path ExternalPath
// PrevPath is the previously used non-store path. It may be set with Mode ==
// WALFailoverExplicitPath (when changing the secondary path) or
// WALFailoverDisabled (when disabling WAL failover after it was previously
// enabled with WALFailoverExplicitPath). It must be empty for other modes.
// If Mode is WALFailoverDisabled and previously WAL failover was enabled
// using WALFailoverAmongStores, then PrevPath must not be set.
PrevPath ExternalPath
}

// ExternalPath represents a non-store path and associated encryption-at-rest
// configuration.
type ExternalPath struct {
Path string
// EncryptionOptions is a serialized protobuf set by Go CCL code describing
// the encryption-at-rest configuration. If encryption-at-rest has ever been
// enabled on the store, this field must be set.
EncryptionOptions []byte
}

// IsSet returns whether or not the external path was provided.
func (e ExternalPath) IsSet() bool { return e.Path != "" }

// Type implements the pflag.Value interface.
func (c *WALFailoverConfig) Type() string { return "string" }

// String implements fmt.Stringer.
func (c *WALFailoverConfig) String() string {
return redact.StringWithoutMarkers(c)
}

// SafeFormat implements the refact.SafeFormatter interface.
func (c *WALFailoverConfig) SafeFormat(p redact.SafePrinter, _ rune) {
switch c.Mode {
case WALFailoverDefault:
// Empty
case WALFailoverDisabled:
p.SafeString("disabled")
if c.PrevPath.IsSet() {
p.SafeString(",prev_path=")
p.SafeString(redact.SafeString(c.PrevPath.Path))
}
case WALFailoverAmongStores:
p.SafeString("among-stores")
case WALFailoverExplicitPath:
p.SafeString("path=")
p.SafeString(redact.SafeString(c.Path.Path))
if c.PrevPath.IsSet() {
p.SafeString(",prev_path=")
p.SafeString(redact.SafeString(c.PrevPath.Path))
}
default:
p.Printf("<unknown WALFailoverMode %d>", int8(c.Mode))
}
}

// Set implements the pflag.Value interface.
func (m *WALFailoverMode) Set(s string) error {
switch s {
case "disabled":
*m = WALFailoverDisabled
case "among-stores":
*m = WALFailoverAmongStores
func (c *WALFailoverConfig) Set(s string) error {
switch {
case strings.HasPrefix(s, "disabled"):
c.Mode = WALFailoverDisabled
var ok bool
c.Path.Path, c.PrevPath.Path, ok = parseWALFailoverPathFields(strings.TrimPrefix(s, "disabled"))
if !ok || c.Path.IsSet() {
return errors.Newf("invalid disabled --wal-failover setting: %s "+
"expect disabled[,prev_path=<prev_path>]", s)
}
case s == "among-stores":
c.Mode = WALFailoverAmongStores
case strings.HasPrefix(s, "path="):
c.Mode = WALFailoverExplicitPath
var ok bool
c.Path.Path, c.PrevPath.Path, ok = parseWALFailoverPathFields(s)
if !ok || !c.Path.IsSet() {
return errors.Newf("invalid path --wal-failover setting: %s "+
"expect path=<path>[,prev_path=<prev_path>]", s)
}
default:
return errors.Newf("invalid --wal-failover setting: %s "+
"(possible values: disabled, among-stores)", s)
"(possible values: disabled, among-stores, path=<path>)", s)
}
return nil
}

func parseWALFailoverPathFields(s string) (path, prevPath string, ok bool) {
if s == "" {
return "", "", true
}
if s2 := strings.TrimPrefix(s, "path="); len(s2) < len(s) {
s = s2
if i := strings.IndexByte(s, ','); i == -1 {
return s, "", true
} else {
path = s[:i]
s = s[i:]
}
}

// Any remainder must be a prev_path= field.
if !strings.HasPrefix(s, ",prev_path=") {
return "", "", false
}
prevPath = strings.TrimPrefix(s, ",prev_path=")
return path, prevPath, true
}

// ExternalIODirConfig describes various configuration options pertaining
// to external storage implementations.
// TODO(adityamaru): Rename ExternalIODirConfig to ExternalIOConfig because it
Expand Down
20 changes: 20 additions & 0 deletions pkg/base/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
package base_test

import (
"bytes"
"fmt"
"math"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/datadriven"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -154,3 +157,20 @@ func TestRaftMaxInflightBytes(t *testing.T) {
})
}
}

func TestWALFailoverConfigRoundtrip(t *testing.T) {
defer leaktest.AfterTest(t)()

datadriven.RunTest(t, datapathutils.TestDataPath(t, "wal-failover-config"), func(t *testing.T, d *datadriven.TestData) string {
var buf bytes.Buffer
for _, l := range strings.Split(d.Input, "\n") {
var cfg base.WALFailoverConfig
if err := cfg.Set(l); err != nil {
fmt.Fprintf(&buf, "err: %s\n", err)
continue
}
fmt.Fprintln(&buf, cfg.String())
}
return buf.String()
})
}
4 changes: 2 additions & 2 deletions pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import (
// hard coded to 640MiB.
const MinimumStoreSize = 10 * 64 << 20

// GetAbsoluteStorePath takes a (possibly relative) and returns the absolute path.
// GetAbsoluteFSPath takes a (possibly relative) and returns the absolute path.
// Returns an error if the path begins with '~' or Abs fails.
// 'fieldName' is used in error strings.
func GetAbsoluteStorePath(fieldName string, p string) (string, error) {
func GetAbsoluteFSPath(fieldName string, p string) (string, error) {
if p[0] == '~' {
return "", fmt.Errorf("%s cannot start with '~': %s", fieldName, p)
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/base/testdata/wal-failover-config
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
parse
among-stores
----
among-stores

parse
disabled
disabled,prev_path=foo
----
disabled
disabled,prev_path=foo

parse
path=/foo
path=/foo,prev_path=/bar
----
path=/foo
path=/foo,prev_path=/bar

parse
disabled,path=foo
among-stores,path=foo
among-stores,prev_path=foo
garbage
----
err: invalid disabled --wal-failover setting: disabled,path=foo expect disabled[,prev_path=<prev_path>]
err: invalid --wal-failover setting: among-stores,path=foo (possible values: disabled, among-stores, path=<path>)
err: invalid --wal-failover setting: among-stores,prev_path=foo (possible values: disabled, among-stores, path=<path>)
err: invalid --wal-failover setting: garbage (possible values: disabled, among-stores, path=<path>)
60 changes: 40 additions & 20 deletions pkg/ccl/baseccl/encryption_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewStoreEncryptionSpec(value string) (StoreEncryptionSpec, error) {
switch field {
case pathField:
var err error
es.Path, err = base.GetAbsoluteStorePath(pathField, value)
es.Path, err = base.GetAbsoluteFSPath(pathField, value)
if err != nil {
return StoreEncryptionSpec{}, err
}
Expand All @@ -101,7 +101,7 @@ func NewStoreEncryptionSpec(value string) (StoreEncryptionSpec, error) {
es.KeyPath = plaintextFieldValue
} else {
var err error
es.KeyPath, err = base.GetAbsoluteStorePath("key", value)
es.KeyPath, err = base.GetAbsoluteFSPath("key", value)
if err != nil {
return StoreEncryptionSpec{}, err
}
Expand All @@ -111,7 +111,7 @@ func NewStoreEncryptionSpec(value string) (StoreEncryptionSpec, error) {
es.OldKeyPath = plaintextFieldValue
} else {
var err error
es.OldKeyPath, err = base.GetAbsoluteStorePath("old-key", value)
es.OldKeyPath, err = base.GetAbsoluteFSPath("old-key", value)
if err != nil {
return StoreEncryptionSpec{}, err
}
Expand Down Expand Up @@ -141,17 +141,17 @@ func NewStoreEncryptionSpec(value string) (StoreEncryptionSpec, error) {
return es, nil
}

// StoreEncryptionSpecList contains a slice of StoreEncryptionSpecs that implements pflag's value
// EncryptionSpecList contains a slice of StoreEncryptionSpecs that implements pflag's value
// interface.
type StoreEncryptionSpecList struct {
type EncryptionSpecList struct {
Specs []StoreEncryptionSpec
}

var _ pflag.Value = &StoreEncryptionSpecList{}
var _ pflag.Value = &EncryptionSpecList{}

// String returns a string representation of all the StoreEncryptionSpecs. This is part
// of pflag's value interface.
func (encl StoreEncryptionSpecList) String() string {
func (encl EncryptionSpecList) String() string {
var buffer bytes.Buffer
for _, ss := range encl.Specs {
fmt.Fprintf(&buffer, "--%s=%s ", cliflagsccl.EnterpriseEncryption.Name, ss)
Expand All @@ -165,13 +165,13 @@ func (encl StoreEncryptionSpecList) String() string {

// Type returns the underlying type in string form. This is part of pflag's
// value interface.
func (encl *StoreEncryptionSpecList) Type() string {
return "StoreEncryptionSpec"
func (encl *EncryptionSpecList) Type() string {
return "EncryptionSpec"
}

// Set adds a new value to the StoreEncryptionSpecValue. It is the important part of
// pflag's value interface.
func (encl *StoreEncryptionSpecList) Set(value string) error {
func (encl *EncryptionSpecList) Set(value string) error {
spec, err := NewStoreEncryptionSpec(value)
if err != nil {
return err
Expand All @@ -184,12 +184,13 @@ func (encl *StoreEncryptionSpecList) Set(value string) error {
return nil
}

// PopulateStoreSpecWithEncryption iterates through the StoreEncryptionSpecList and looks
// for matching paths in the StoreSpecList.
// Any unmatched StoreEncryptionSpec causes an error.
// Matching stores have a few encryption-related fields set.
func PopulateStoreSpecWithEncryption(
storeSpecs base.StoreSpecList, encryptionSpecs StoreEncryptionSpecList,
// PopulateWithEncryptionOpts iterates through the EncryptionSpecList and looks
// for matching paths in the StoreSpecList and WAL failover config. Any
// unmatched EncryptionSpec causes an error.
func PopulateWithEncryptionOpts(
storeSpecs base.StoreSpecList,
walFailoverConfig *base.WALFailoverConfig,
encryptionSpecs EncryptionSpecList,
) error {
for _, es := range encryptionSpecs.Specs {
var found bool
Expand All @@ -212,18 +213,37 @@ func PopulateStoreSpecWithEncryption(
found = true
break
}

for _, externalPath := range [2]*base.ExternalPath{&walFailoverConfig.Path, &walFailoverConfig.PrevPath} {
if !externalPath.IsSet() || externalPath.Path != es.Path {
continue
}
// NB: The external paths WALFailoverConfig.Path and
// WALFailoverConfig.PrevPath are only ever set in single-store
// configurations. In multi-store with among-stores failover mode, these
// will be empty (so we won't encounter the same path twice).
if len(externalPath.EncryptionOptions) > 0 {
return fmt.Errorf("WAL failover path %s already has an encryption setting",
externalPath.Path)
}
opts, err := es.ToEncryptionOptions()
if err != nil {
return err
}
externalPath.EncryptionOptions = opts
found = true
}

if !found {
return fmt.Errorf("no store with path %s found for encryption setting: %v", es.Path, es)
return fmt.Errorf("no usage of path %s found for encryption setting: %v", es.Path, es)
}
}
return nil
}

// EncryptionOptionsForStore takes a store directory and returns its EncryptionOptions
// if a matching entry if found in the StoreEncryptionSpecList.
func EncryptionOptionsForStore(
dir string, encryptionSpecs StoreEncryptionSpecList,
) ([]byte, error) {
func EncryptionOptionsForStore(dir string, encryptionSpecs EncryptionSpecList) ([]byte, error) {
// We need an absolute path, but the input may have come in relative.
path, err := filepath.Abs(dir)
if err != nil {
Expand Down