Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: allow restricting backup coordination by region #95791

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ unreserved_keyword ::=
| 'CONTROLJOB'
| 'CONVERSION'
| 'CONVERT'
| 'COORDINATOR_LOCALITY'
| 'COPY'
| 'COST'
| 'COVERING'
Expand Down Expand Up @@ -2256,6 +2257,7 @@ backup_options ::=
| 'DETACHED' '=' 'FALSE'
| 'KMS' '=' string_or_placeholder_opt_list
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'COORDINATOR_LOCALITY' '=' string_or_placeholder

c_expr ::=
d_expr
Expand Down Expand Up @@ -3409,6 +3411,7 @@ bare_label_keywords ::=
'AS_JSON'
| 'ATOMIC'
| 'CALLED'
| 'COORDINATOR_LOCALITY'
| 'COST'
| 'CHECK_FILES'
| 'DEBUG_IDS'
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/stop",
Expand Down
41 changes: 41 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -450,6 +451,11 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)

if err := b.maybeRelocateJobExecution(ctx, p, details.CoordinatorLocation); err != nil {
return err
}

kmsEnv := backupencryption.MakeBackupKMSEnv(
p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig,
Expand Down Expand Up @@ -842,6 +848,41 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

func (b *backupResumer) maybeRelocateJobExecution(
ctx context.Context, p sql.JobExecContext, locality roachpb.Locality,
) error {
if locality.NonEmpty() {
current, err := p.DistSQLPlanner().GetSQLInstanceInfo(p.ExecCfg().JobRegistry.ID())
if err != nil {
return err
}
if ok, missedTier := current.Locality.Matches(locality); !ok {
log.Infof(ctx,
"BACKUP job %d initially adopted on instance %d but it does not match locality filter %s, finding a new coordinator",
b.job.ID(), current.NodeID, missedTier.String(),
)

instancesInRegion, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, locality)
if err != nil {
return err
}
rng, _ := randutil.NewPseudoRand()
dest := instancesInRegion[rng.Intn(len(instancesInRegion))]

var res error
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
res, err = p.ExecCfg().JobRegistry.RelocateLease(ctx, txn, b.job.ID(), dest.InstanceID, dest.SessionID)
return err
}); err != nil {
return errors.Wrapf(err, "failed to relocate job coordinator to %d", dest.InstanceID)
}
return res
}
}
return nil
}

func getBackupDetailAndManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func backupTypeCheck(
exprutil.Strings{
backupStmt.Subdir,
backupStmt.Options.EncryptionPassphrase,
backupStmt.Options.CoordinatorLocality,
},
exprutil.StringArrays{
tree.Exprs(backupStmt.To),
Expand Down Expand Up @@ -600,6 +601,19 @@ func backupPlanHook(
}
}

var coordinatorLocality roachpb.Locality
if backupStmt.Options.CoordinatorLocality != nil {
s, err := exprEval.String(ctx, backupStmt.Options.CoordinatorLocality)
if err != nil {
return nil, nil, nil, false, err
}
if s != "" {
if err := coordinatorLocality.Set(s); err != nil {
return nil, nil, nil, false, err
}
}
}
dt marked this conversation as resolved.
Show resolved Hide resolved

encryptionParams := jobspb.BackupEncryptionOptions{
Mode: jobspb.EncryptionMode_None,
}
Expand Down Expand Up @@ -712,6 +726,13 @@ func backupPlanHook(
return err
}

// Check that a node will currently be able to run this before we create it.
if coordinatorLocality.NonEmpty() {
if _, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, coordinatorLocality); err != nil {
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this error end up looking like? I'm wondering if we should wrap it, mentioning the COORDINATOR_LOCALITY option so the user knows what they might need to change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like pq: no instances found matching locality filter %s. It includes the filter that didn't match, which seems more useful than the field they put that filter in?

}
}

initialDetails := jobspb.BackupDetails{
Destination: jobspb.BackupDetails_Destination{To: to, IncrementalStorage: incrementalStorage},
EndTime: endTime,
Expand All @@ -723,6 +744,7 @@ func backupPlanHook(
AsOfInterval: asOfInterval,
Detached: detached,
ApplicationName: p.SessionData().ApplicationName,
CoordinatorLocation: coordinatorLocality,
}
if backupStmt.CreatedByInfo != nil && backupStmt.CreatedByInfo.Name == jobs.CreatedByScheduledJobs {
initialDetails.ScheduleID = backupStmt.CreatedByInfo.ID
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10732,3 +10732,49 @@ $$;
require.NoError(t, err)

}

func localityFromStr(t *testing.T, s string) roachpb.Locality {
var l roachpb.Locality
require.NoError(t, l.Set(s))
return l
}

func TestBackupInLocality(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1000

// Disabled to run within tenant as certain MR features are not available to tenants.
args := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{
0: {Locality: localityFromStr(t, "region=east,dc=1,az=1")},
1: {Locality: localityFromStr(t, "region=east,dc=2,az=2")},
2: {Locality: localityFromStr(t, "region=west,dc=1,az=1")},
}}

cluster, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, 3 /* nodes */, numAccounts, InitManualReplication, args)
defer cleanupFn()

for i, tc := range []struct {
node int
filter, err string
}{
{node: 1, filter: "region=east", err: ""},
{node: 1, filter: "region=east,dc=1", err: ""},
{node: 1, filter: "region=east,dc=6", err: "no instances found"},
{node: 1, filter: "region=central", err: "no instances found"},
{node: 1, filter: "region=east,dc=2", err: "relocated"},
{node: 1, filter: "region=west,dc=1", err: "relocated"},

{node: 2, filter: "region=east", err: ""},
{node: 2, filter: "region=east,az=2", err: ""},
{node: 2, filter: "region=east,dc=1", err: "relocated"},
{node: 2, filter: "region=east,az=1", err: "relocated"},

{node: 3, filter: "region=east", err: "relocated"},
{node: 3, filter: "region=central,dc=1", err: "no instances found"},
} {
db := sqlutils.MakeSQLRunner(cluster.ServerConn(tc.node - 1))
db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter)
}
}
16 changes: 10 additions & 6 deletions pkg/jobs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ func IsPermanentJobError(err error) bool {
return errors.Is(err, errJobPermanentSentinel)
}

// IsPauseSelfError checks whether the given error is a
// PauseRequestError.
func IsPauseSelfError(err error) bool {
return errors.Is(err, errPauseSelfSentinel)
}

// errPauseSelfSentinel exists so the errors returned from PauseRequestErr can
// be marked with it.
var errPauseSelfSentinel = errors.New("job requested it be paused")
Expand All @@ -65,9 +59,19 @@ func MarkPauseRequestError(reason error) error {
return errors.Mark(reason, errPauseSelfSentinel)
}

// IsPauseSelfError checks whether the given error is a
// PauseRequestError.
func IsPauseSelfError(err error) bool {
return errors.Is(err, errPauseSelfSentinel)
}

// PauseRequestExplained is a prose used to wrap and explain a pause-request error.
const PauseRequestExplained = "pausing due to error; use RESUME JOB to try to proceed once the issue is resolved, or CANCEL JOB to rollback"

// errJobLeaseNotHeld is a marker error for returning from a job execution if it
// knows or finds out it no longer has a job lease.
var errJobLeaseNotHeld = errors.New("job lease not held")

// InvalidStatusError is the error returned when the desired operation is
// invalid given the job's current status.
type InvalidStatusError struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ message BackupDetails {
// ApplicationName is the application name in the session where the backup was
// invoked.
string application_name = 23;

roachpb.Locality coordinator_location = 24 [(gogoproto.nullable) = false];

// NEXT ID: 25;
}

message BackupProgress {
Expand Down
20 changes: 20 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,9 @@ func (r *Registry) stepThroughStateMachine(
jm.ResumeRetryError.Inc(1)
return errors.Errorf("job %d: node liveness error: restarting in background", job.ID())
}
if errors.Is(err, errJobLeaseNotHeld) {
return err
}

if errors.Is(err, errPauseSelfSentinel) {
if err := r.PauseRequested(ctx, nil, job.ID(), err.Error()); err != nil {
Expand Down Expand Up @@ -1820,6 +1823,23 @@ func (r *Registry) CheckPausepoint(name string) error {
return nil
}

func (r *Registry) RelocateLease(
ctx context.Context,
txn isql.Txn,
id jobspb.JobID,
destID base.SQLInstanceID,
destSession sqlliveness.SessionID,
Comment on lines +1830 to +1831
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function take a sqlinstance.InstanceInfo (or some other type that bundles the instance id and session ID)?

Copy link
Member Author

@dt dt Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InstanceInfo has a lot more going on than I need/have. I don't know if we have anything that is just the pair of id and session. I'm not sure I feel like they need to be paired, conceptually? I guess I read ID as the destination, and session as a way to make sure it doesn't get dropped in transit but they're not obviously more linked than the other individual args?

) (sentinel error, failure error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 I can't decide if a function that returns two errors is more or less weird than a function that always returns an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, right? I went back and forth on this and decided I liked this best: I want to indicate failure vs success separately since e.g. callers are encouraged to Wrapf() their failures with contextual info or log them or clean up differently. I thought about making the first have a different type that just happened to implement Error() but I eventually decided error made it most obvious that you return this thing from Resume() and don't go make your own.

if _, err := r.db.Executor().Exec(ctx, "job-relocate-coordinator", txn.KV(),
"UPDATE system.jobs SET claim_instance_id = $2, claim_session_id = $3 WHERE id = $1",
id, destID, destSession.UnsafeBytes(),
dt marked this conversation as resolved.
Show resolved Hide resolved
); err != nil {
return nil, errors.Wrapf(err, "failed to relocate job coordinator to %d", destID)
}

return errors.Mark(errors.Newf("execution of job %d relocated to %d", id, destID), errJobLeaseNotHeld), nil
}

// TestingIsJobIdle returns true if the job is adopted and currently idle.
func (r *Registry) TestingIsJobIdle(jobID jobspb.JobID) bool {
r.mu.Lock()
Expand Down
17 changes: 17 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,23 @@ func (l Locality) String() string {
return strings.Join(tiers, ",")
}

// NonEmpty returns true if the tiers are non-empty.
func (l Locality) NonEmpty() bool {
return len(l.Tiers) > 0
}

// Matches checks if this locality has a tier with a matching value for each
// tier of the passed filter, returning true if so or false if not along with
// the first tier of the filters that did not matched.
func (l Locality) Matches(filter Locality) (bool, Tier) {
for _, t := range filter.Tiers {
if v, ok := l.Find(t.Key); !ok || v != t.Value {
return false, t
}
}
return true, Tier{}
}

// Type returns the underlying type in string form. This is part of pflag's
// value interface.
func (Locality) Type() string {
Expand Down
35 changes: 35 additions & 0 deletions pkg/roachpb/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,41 @@ func TestLocalityConversions(t *testing.T) {
}
}

func TestLocalityMatches(t *testing.T) {
dt marked this conversation as resolved.
Show resolved Hide resolved
var empty Locality
var l Locality
require.NoError(t, l.Set("a=b,c=d,e=f"))
for _, tc := range []struct {
filter string
miss string
}{
{filter: "", miss: ""},
{filter: "a=b", miss: ""},
{filter: "a=b,c=d,e=f", miss: ""},
{filter: "c=d,e=f,a=b", miss: ""},
{filter: "a=z", miss: "a=z"},
{filter: "a=b,c=x,e=f", miss: "c=x"},
{filter: "a=b,x=y", miss: "x=y"},
} {
t.Run(fmt.Sprintf("%s-miss-%s", tc.filter, tc.miss), func(t *testing.T) {
var filter Locality
if tc.filter != "" {
require.NoError(t, filter.Set(tc.filter))
}
matches, miss := l.Matches(filter)
if tc.miss == "" {
require.True(t, matches)
} else {
require.False(t, matches)
require.Equal(t, tc.miss, miss.String())
}

emptyMatches, _ := empty.Matches(filter)
require.Equal(t, tc.filter == "", emptyMatches)
})
}
}

func TestDiversityScore(t *testing.T) {
// Keys are not considered for score, just the order, so we don't need to
// specify them.
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,28 @@ func NewDistSQLPlanner(
return dsp
}

// GetAllInstancesByLocality lists all instances that match the passed locality
// filters.
func (dsp *DistSQLPlanner) GetAllInstancesByLocality(
ctx context.Context, filter roachpb.Locality,
) ([]sqlinstance.InstanceInfo, error) {
all, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, err
}
var pos int
for _, n := range all {
if ok, _ := n.Locality.Matches(filter); ok {
all[pos] = n
pos++
}
}
if pos == 0 {
return nil, errors.Newf("no instances found matching locality filter %s", filter.String())
}
return all[:pos], nil
}

// GetSQLInstanceInfo gets a node descriptor by node ID.
func (dsp *DistSQLPlanner) GetSQLInstanceInfo(
sqlInstanceID base.SQLInstanceID,
Expand Down
Loading