Skip to content

Commit

Permalink
migration: migrate into span configs infrastructure
Browse files Browse the repository at this point in the history
We need a version gate before switching over to the span configs
infrastructure by default (#73876, future commit). In a mixed-version
cluster we need to wait for the host tenant to have fully populated
system.span_configurations (read: reconciled) at least once before it's
safe to using it as a view for all split/config decisions.

    clusterversion.EnsureSpanConfigReconciliation

We also want to ensure that the view over all configs maintained
per-store is at least as up-to-date as some full reconciliation
timestamp.

    clusterversion.EnsureSpanConfigSubscription

Without a version gate, it would be possible for a replica on a
new-binary-server to apply static fallback configs (assuming no entries
in `system.span_configurations`), in violation of explicit configs
directly set by the user. Though unlikely, it's also possible for us to
merge all ranges into a single one (with no entries in
system.span_configurations, the infrastructure can erroneously conclude
that there are zero split points).

To get past all this, we author a three-step migration, two with
attached migrations for each of the steps above (in order), and a final
cluster version gate.

    clusterversion.EnableSpanConfigStore

Release note: None
  • Loading branch information
irfansharif committed Jan 13, 2022
1 parent 5d7f4d3 commit aeb410a
Show file tree
Hide file tree
Showing 15 changed files with 503 additions and 11 deletions.
22 changes: 22 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ const (
// system.protected_ts_records table that describes what is protected by the
// record.
AlterSystemProtectedTimestampAddColumn
// EnsureSpanConfigReconciliation ensures that the host tenant has run its
// reconciliation process at least once.
EnsureSpanConfigReconciliation
// EnsureSpanConfigSubscription ensures that all KV nodes are subscribed to
// the global span configuration state, observing the entries installed as
// in EnsureSpanConfigReconciliation.
EnsureSpanConfigSubscription
// EnableSpanConfigStore enables the use of the span configs infrastructure
// in KV.
EnableSpanConfigStore

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -337,6 +347,18 @@ var versionsSingleton = keyedVersions{
Key: AlterSystemProtectedTimestampAddColumn,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 36},
},
{
Key: EnsureSpanConfigReconciliation,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 38},
},
{
Key: EnsureSpanConfigSubscription,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 40},
},
{
Key: EnableSpanConfigStore,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 42},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
7 changes: 5 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ message AutoSpanConfigReconciliationDetails {
// AutoSpanConfigReconciliationProgress is the persisted progress for the span
// config reconciliation job.
message AutoSpanConfigReconciliationProgress {
util.hlc.Timestamp checkpoint = 1 [(gogoproto.nullable) = false];
}

message ResumeSpanList {
Expand Down
50 changes: 47 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2114,11 +2114,36 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro
return nil, errSysCfgUnavailable
}

if s.cfg.SpanConfigsEnabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) {
return s.cfg.SpanConfigSubscriber, nil
// We need a version gate here before switching over to the span configs
// infrastructure. In a mixed-version cluster we need to wait for
// the host tenant to have fully populated `system.span_configurations`
// (read: reconciled) at least once before using it as a view for all
// split/config decisions.
_ = clusterversion.EnsureSpanConfigReconciliation
//
// We also want to ensure that the KVSubscriber on each store is at least as
// up-to-date as some full reconciliation timestamp.
_ = clusterversion.EnsureSpanConfigSubscription
//
// Without a version gate, it would be possible for a replica on a
// new-binary-server to apply the static fallback config (assuming no
// entries in `system.span_configurations`), in violation of explicit
// configs directly set by the user. Though unlikely, it's also possible for
// us to merge all ranges into a single one -- with no entries in
// system.span_configurations, the infrastructure can erroneously conclude
// that there are zero split points.
//
// We achieve all this through a three-step migration process, culminating
// in the following cluster version gate:
_ = clusterversion.EnableSpanConfigStore

if !s.cfg.SpanConfigsEnabled ||
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) ||
!s.cfg.Settings.Version.IsActive(ctx, clusterversion.EnableSpanConfigStore) {
return sysCfg, nil
}

return sysCfg, nil
return s.cfg.SpanConfigSubscriber, nil
}

// startLeaseRenewer runs an infinite loop in a goroutine which regularly
Expand Down Expand Up @@ -3353,6 +3378,25 @@ func (s *Store) PurgeOutdatedReplicas(ctx context.Context, version roachpb.Versi
return g.Wait()
}

// WaitForSpanConfigSubscription waits until the store is wholly subscribed to
// the global span configurations state.
func (s *Store) WaitForSpanConfigSubscription(ctx context.Context) error {
if !s.cfg.SpanConfigsEnabled {
return nil // nothing to do here
}

for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
if !s.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
return nil
}

log.Warningf(ctx, "waiting for span config subscription...")
continue
}

return errors.Newf("unable to subscribe to span configs")
}

// registerLeaseholder registers the provided replica as a leaseholder in the
// node's closed timestamp side transport.
func (s *Store) registerLeaseholder(
Expand Down
7 changes: 7 additions & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"alter_table_statistics_avg_size.go",
"ensure_no_draining_names.go",
"insert_missing_public_schema_namespace_entry.go",
"migrate_span_configs.go",
"migrations.go",
"public_schema_migration.go",
"schema_changes.go",
Expand All @@ -16,6 +17,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrations",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand All @@ -24,6 +26,7 @@ go_library(
"//pkg/migration",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down Expand Up @@ -56,6 +59,7 @@ go_test(
"ensure_no_draining_names_external_test.go",
"helpers_test.go",
"main_test.go",
"migrate_span_configs_test.go",
"public_schema_migration_external_test.go",
],
data = glob(["testdata/**"]),
Expand All @@ -67,10 +71,12 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
Expand All @@ -82,6 +88,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
76 changes: 76 additions & 0 deletions pkg/migration/migrations/migrate_span_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations

import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

func ensureSpanConfigReconciliation(
ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, j *jobs.Job,
) error {
if !d.Codec.ForSystemTenant() {
return nil
}

for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
row, err := d.InternalExecutor.QueryRowEx(ctx, "get-spanconfig-progress", nil,
sessiondata.NodeUserSessionDataOverride,
`SELECT progress FROM system.jobs WHERE id = (SELECT job_id FROM [SHOW AUTOMATIC JOBS] WHERE job_type = 'AUTO SPAN CONFIG RECONCILIATION')`)
if err != nil {
return err
}
if row == nil {
log.Warningf(ctx, "reconciliation job not found")
continue
}
progress, err := jobs.UnmarshalProgress(row[0])
if err != nil {
return err
}
sp, ok := progress.GetDetails().(*jobspb.Progress_AutoSpanConfigReconciliation)
if !ok {
log.Fatal(ctx, "unexpected job progress type")
}
if sp.AutoSpanConfigReconciliation.Checkpoint.IsEmpty() {
log.Warningf(ctx, "waiting for span config reconciliation...")
continue
}

return nil
}

return errors.Newf("unable to reconcile span configs")
}

func ensureSpanConfigSubscription(
ctx context.Context, _ clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job,
) error {
return deps.Cluster.UntilClusterStable(ctx, func() error {
return deps.Cluster.ForEveryNode(ctx, "ensure-span-config-subscription",
func(ctx context.Context, client serverpb.MigrationClient) error {
req := &serverpb.WaitForSpanConfigSubscriptionRequest{}
_, err := client.WaitForSpanConfigSubscription(ctx, req)
return err
})
})
}
Loading

0 comments on commit aeb410a

Please sign in to comment.