diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 3ffbd876b97b..2ee90c6b9529 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -27,6 +27,7 @@ changefeed.sink_io_workers integer 0 the number of workers used by changefeeds w cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload application cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage application cloudstorage.timeout duration 10m0s the timeout for import/export storage operations application +cluster.auto_upgrade.enabled boolean true disable automatic cluster version upgrade until reset application cluster.organization string organization name system-visible cluster.preserve_downgrade_option string disable (automatic or manual) cluster version upgrade from the specified version until reset application diagnostics.forced_sql_stat_reset.interval duration 2h0m0s interval after which the reported SQL Stats are reset even if not collected by telemetry reporter. It has a max value of 24H. application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3f554088e63c..b8305a447daa 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -32,6 +32,7 @@
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an uploadServerless/Dedicated/Self-Hosted
cloudstorage.http.custom_ca
stringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storageServerless/Dedicated/Self-Hosted
cloudstorage.timeout
duration10m0sthe timeout for import/export storage operationsServerless/Dedicated/Self-Hosted +
cluster.auto_upgrade.enabled
booleantruedisable automatic cluster version upgrade until resetServerless/Dedicated/Self-Hosted
cluster.organization
stringorganization nameServerless/Dedicated/Self-Hosted (read-only)
cluster.preserve_downgrade_option
stringdisable (automatic or manual) cluster version upgrade from the specified version until resetServerless/Dedicated/Self-Hosted
diagnostics.active_query_dumps.enabled
booleantrueexperimental: enable dumping of anonymized active queries to disk when node is under memory pressureDedicated/Self-Hosted diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel index 27edb6006912..a9be9097cdfc 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel @@ -7,6 +7,7 @@ go_test( "tenant_upgrade_test.go", ], args = ["-test.timeout=295s"], + shard_count = 4, tags = ["ccl_test"], deps = [ "//pkg/base", @@ -19,12 +20,15 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/sql/sem/eval", "//pkg/sql/sqlinstance/instancestorage", "//pkg/sql/sqlliveness/slinstance", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/upgrade", "//pkg/upgrade/upgradebase", + "//pkg/util", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go index 610502b6e770..f85860b96ba5 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go @@ -11,6 +11,7 @@ package upgradeccl_test import ( "context" gosql "database/sql" + "fmt" "testing" "time" @@ -21,18 +22,233 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/stretchr/testify/require" ) +func TestTenantAutoUpgradeRespectsAutoUpgradeEnabledSetting(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStressRace(t) + + // v0 is hard-coded because at clusterversion.TestingBinaryMinSupportedVersion is `v22.2` at the + // time of typing and it does not support shared process tenants. We should update v0 to be + // clusterversion.TestingBinaryMinSupportedVersion when it is bumped to `v23.1`. + v0 := clusterversion.V23_1 + ctx := context.Background() + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.ByKey(v0), + false, // initializeVersion + ) + // Initialize the version to v0. + require.NoError(t, clusterversion.Initialize(ctx, + clusterversion.ByKey(v0), &settings.SV)) + + ts := serverutils.StartServerOnly(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey(v0), + BootstrapVersionKeyOverride: v0, + }, + SQLEvalContext: &eval.TestingKnobs{ + // When the host binary version is not equal to its cluster version, tenant logical version is set + // to the host's minimum supported binary version. We need this override to ensure that the tenant is + // created at v0. + TenantLogicalVersionKeyOverride: v0, + }, + }, + }) + defer ts.Stopper().Stop(ctx) + sysDB := sqlutils.MakeSQLRunner(ts.SQLConn(t, "")) + + expectedInitialTenantVersion := clusterversion.ByKey(v0) + + tenantSettings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.ByKey(v0), + false, // initializeVersion + ) + require.NoError(t, clusterversion.Initialize(ctx, + expectedInitialTenantVersion, &tenantSettings.SV)) + + upgradeInfoCh := make(chan struct { + Status int + UpgradeTo roachpb.Version + }, 1) + mkTenant := func(t *testing.T, name string) (tenantDB *gosql.DB) { + tenantArgs := base.TestSharedProcessTenantArgs{ + TenantName: roachpb.TenantName(name), + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + TenantAutoUpgradeInfo: upgradeInfoCh, + BootstrapVersionKeyOverride: v0, + BinaryVersionOverride: clusterversion.ByKey(v0), + }, + }, + } + _, tenantDB, err := ts.TenantController().StartSharedProcessTenant(ctx, tenantArgs) + require.NoError(t, err) + return tenantDB + } + + // Create a shared process tenant and its SQL server. + const tenantName = "marhaba-crdb" + tenantDB := mkTenant(t, tenantName) + tenantRunner := sqlutils.MakeSQLRunner(tenantDB) + + // Ensure that the tenant works. + tenantRunner.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + tenantRunner.Exec(t, "INSERT INTO t VALUES (1), (2)") + + // Disable cluster.auto_upgrade.enabled setting for the tenant to prevent auto upgrade. + tenantRunner.Exec(t, fmt.Sprintf("SET CLUSTER SETTING %s = false", clusterversion.AutoUpgradeEnabled.Name())) + + // Upgrade the host cluster. + sysDB.Exec(t, + "SET CLUSTER SETTING version = $1", + clusterversion.TestingBinaryVersion.String()) + + // Ensure that the tenant still works. + tenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}}) + + // Wait for auto upgrade status to be received by the testing knob. + succeedsSoon := 20 * time.Second + for { + select { + case upgradeInfo := <-upgradeInfoCh: + if int(server.UpgradeDisabledByConfiguration) == upgradeInfo.Status { + return + } + case <-time.After(succeedsSoon): + t.Fatalf("failed to receive the right auto upgrade status after %d seconds", int(succeedsSoon.Seconds())) + } + } +} + +func TestTenantAutoUpgrade(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStressRace(t) + + // v0 is hard-coded because at clusterversion.TestingBinaryMinSupportedVersion is `v22.2` at the + // time of typing and it does not support shared process tenants. We should update v0 to be + // clusterversion.TestingBinaryMinSupportedVersion when it is bumped to `v23.1`. + v0 := clusterversion.V23_1 + ctx := context.Background() + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.ByKey(v0), + false, // initializeVersion + ) + // Initialize the version to v0. + require.NoError(t, clusterversion.Initialize(ctx, + clusterversion.ByKey(v0), &settings.SV)) + + ts := serverutils.StartServerOnly(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey(v0), + BootstrapVersionKeyOverride: v0, + }, + SQLEvalContext: &eval.TestingKnobs{ + // When the host binary version is not equal to its cluster version, tenant logical version is set + // to the host's minimum supported binary version. We need this override to ensure that the tenant is + // created at v0. + TenantLogicalVersionKeyOverride: v0, + }, + }, + }) + defer ts.Stopper().Stop(ctx) + sysDB := sqlutils.MakeSQLRunner(ts.SQLConn(t, "")) + + expectedInitialTenantVersion := clusterversion.ByKey(v0) + expectedFinalTenantVersion := clusterversion.TestingBinaryVersion + + tenantSettings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.ByKey(v0), + false, // initializeVersion + ) + require.NoError(t, clusterversion.Initialize(ctx, + expectedInitialTenantVersion, &tenantSettings.SV)) + + upgradeInfoCh := make(chan struct { + Status int + UpgradeTo roachpb.Version + }, 1) + mkTenant := func(t *testing.T, name string) (tenantDB *gosql.DB) { + tenantArgs := base.TestSharedProcessTenantArgs{ + TenantName: roachpb.TenantName(name), + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + TenantAutoUpgradeInfo: upgradeInfoCh, + AllowTenantAutoUpgradeOnInternalVersionChanges: true, + BootstrapVersionKeyOverride: v0, + BinaryVersionOverride: clusterversion.ByKey(v0), + }, + }, + } + _, tenantDB, err := ts.TenantController().StartSharedProcessTenant(ctx, tenantArgs) + require.NoError(t, err) + return tenantDB + } + + // Create a shared process tenant and its SQL server. + const tenantName = "hola-crdb" + tenantDB := mkTenant(t, tenantName) + tenantRunner := sqlutils.MakeSQLRunner(tenantDB) + + // Ensure that the tenant works. + tenantRunner.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + tenantRunner.Exec(t, "INSERT INTO t VALUES (1), (2)") + + // Upgrade the host cluster. + sysDB.Exec(t, + "SET CLUSTER SETTING version = $1", + expectedFinalTenantVersion.String()) + + // Ensure that the tenant still works. + tenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}}) + + var upgradeInfo struct { + Status int + UpgradeTo roachpb.Version + } + succeedsSoon := 20 * time.Second + if util.RaceEnabled { + succeedsSoon = 60 * time.Second + } + // Wait for auto upgrade status to be received by the testing knob. + for { + select { + case upgradeInfo = <-upgradeInfoCh: + if upgradeInfo.UpgradeTo == expectedFinalTenantVersion && upgradeInfo.Status == int(server.UpgradeAllowed) { + return + } + case <-time.After(succeedsSoon): + t.Fatalf("failed to receive the right auto upgrade status after %d seconds", int(succeedsSoon.Seconds())) + } + } +} + // TestTenantUpgrade exercises the case where a system tenant is in a // non-finalized version state and creates a tenant. The test ensures // that the newly created tenant begins in that same version. @@ -48,6 +264,7 @@ import ( func TestTenantUpgrade(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderStressRace(t) ctx := context.Background() v1 := clusterversion.TestingBinaryMinSupportedVersion @@ -93,6 +310,9 @@ func TestTenantUpgrade(t *testing.T) { TestingKnobs: base.TestingKnobs{ // Make the upgrade faster by accelerating jobs. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, }, Settings: settings, } @@ -129,6 +349,11 @@ func TestTenantUpgrade(t *testing.T) { t.Log("restart the tenant") tenantServer.AppStopper().Stop(ctx) tenantServer, err := ts.TenantController().StartTenant(ctx, base.TestTenantArgs{ + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, TenantID: roachpb.MustMakeTenantID(initialTenantID), }) require.NoError(t, err) @@ -246,6 +471,9 @@ func TestTenantUpgradeFailure(t *testing.T) { SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, }, + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, UpgradeManager: &upgradebase.TestingKnobs{ DontUseJobs: true, RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) { diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go index 6c28684fa747..dd20eb451099 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go @@ -150,6 +150,9 @@ func runTest(t *testing.T, variant sharedtestutil.TestVariant, test sharedtestut tenantArgs := base.TestTenantArgs{ TenantID: id, TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), UpgradeManager: &upgradebase.TestingKnobs{ InterlockPausePoint: test.PausePoint, @@ -283,6 +286,11 @@ func runTest(t *testing.T, variant sharedtestutil.TestVariant, test sharedtestut Stopper: otherServerStopper, TenantID: tenantID, Settings: otherServerSettings, + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, }) var otherTenantRunner *sqlutils.SQLRunner diff --git a/pkg/clusterversion/setting.go b/pkg/clusterversion/setting.go index 0a9415adb534..88ff5fa95175 100644 --- a/pkg/clusterversion/setting.go +++ b/pkg/clusterversion/setting.go @@ -306,3 +306,13 @@ func MakeMetricsAndRegisterOnVersionChangeCallback(sv *settings.Values) Metrics PreserveDowngradeLastUpdated: gauge, } } + +// AutoUpgradeEnabled is used to enable and disable automatic upgrade. +var AutoUpgradeEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "cluster.auto_upgrade.enabled", + "disable automatic cluster version upgrade until reset", + true, + settings.WithReportable(true), + settings.WithPublic, +) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index df91e7b16cd6..deea9344a946 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -76,6 +76,7 @@ go_library( "stop_trigger.go", "tcp_keepalive_manager.go", "tenant.go", + "tenant_auto_upgrade.go", "tenant_migration.go", "testing_knobs.go", "testserver.go", diff --git a/pkg/server/auto_upgrade.go b/pkg/server/auto_upgrade.go index 16d884b77150..e485fea48774 100644 --- a/pkg/server/auto_upgrade.go +++ b/pkg/server/auto_upgrade.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -61,23 +62,30 @@ func (s *topLevelServer) startAttemptUpgrade(ctx context.Context) error { // status, or stop attempting upgrade. status, err := s.upgradeStatus(ctx, clusterVersion) switch status { - case upgradeBlockedDueToError: + case UpgradeBlockedDueToError: log.Errorf(ctx, "failed attempt to upgrade cluster version, error: %v", err) continue - case upgradeBlockedDueToMixedVersions: + case UpgradeBlockedDueToMixedVersions: log.Infof(ctx, "failed attempt to upgrade cluster version: %v", err) continue - case upgradeDisabledByConfiguration: + case UpgradeDisabledByConfigurationToPreserveDowngrade: log.Infof(ctx, "auto upgrade is disabled for current version (preserve_downgrade_option): %s", redact.Safe(clusterVersion)) // Note: we do 'continue' here (and not 'return') so that the // auto-upgrade gets a chance to continue/complete if the // operator resets `preserve_downgrade_option` after the node // has started up already. continue - case upgradeAlreadyCompleted: + case UpgradeDisabledByConfiguration: + log.Infof(ctx, "auto upgrade is disabled by (cluster.auto_upgrade.enabled)") + // Note: we do 'continue' here (and not 'return') so that the + // auto-upgrade gets a chance to continue/complete if the + // operator resets `auto_upgrade.enabled` after the node + // has started up already. + continue + case UpgradeAlreadyCompleted: log.Info(ctx, "no need to upgrade, cluster already at the newest version") return - case upgradeAllowed: + case UpgradeAllowed: // Fall out of the select below. default: panic(errors.AssertionFailedf("unhandled case: %d", status)) @@ -111,13 +119,32 @@ func (s *topLevelServer) startAttemptUpgrade(ctx context.Context) error { type upgradeStatus int8 const ( - upgradeAllowed upgradeStatus = iota - upgradeAlreadyCompleted - upgradeDisabledByConfiguration - upgradeBlockedDueToError - upgradeBlockedDueToMixedVersions + UpgradeAllowed upgradeStatus = iota + UpgradeAlreadyCompleted + UpgradeDisabledByConfiguration + UpgradeDisabledByConfigurationToPreserveDowngrade + UpgradeBlockedDueToError + UpgradeBlockedDueToMixedVersions + UpgradeBlockedDueToLowStorageClusterVersion ) +// isAutoUpgradeEnabled consults `cluster.auto_upgrade.enabled` and +// `cluster.preserve_downgrade_option` settings to decide if automatic +// upgrade is enabled. The later setting will be retired in a future +// release. +func (s *topLevelServer) isAutoUpgradeEnabled(currentClusterVersion string) upgradeStatus { + if autoUpgradeEnabled := clusterversion.AutoUpgradeEnabled.Get(&s.ClusterSettings().SV); !autoUpgradeEnabled { + // Automatic upgrade is not enabled. + return UpgradeDisabledByConfiguration + } + if downgradeVersion := clusterversion.PreserveDowngradeVersion.Get(&s.ClusterSettings().SV); downgradeVersion != "" { + if currentClusterVersion == downgradeVersion { + return UpgradeDisabledByConfigurationToPreserveDowngrade + } + } + return UpgradeAllowed +} + // upgradeStatus lets the main checking loop know if we should do upgrade, // keep checking upgrade status, or stop attempting upgrade. func (s *topLevelServer) upgradeStatus( @@ -125,11 +152,11 @@ func (s *topLevelServer) upgradeStatus( ) (st upgradeStatus, err error) { nodes, err := s.status.ListNodesInternal(ctx, nil) if err != nil { - return upgradeBlockedDueToError, err + return UpgradeBlockedDueToError, err } vitalities, err := s.nodeLiveness.ScanNodeVitalityFromKV(ctx) if err != nil { - return upgradeBlockedDueToError, err + return UpgradeBlockedDueToError, err } var newVersion string @@ -160,46 +187,26 @@ func (s *topLevelServer) upgradeStatus( if newVersion == "" { newVersion = version } else if version != newVersion { - return upgradeBlockedDueToMixedVersions, errors.Newf( + return UpgradeBlockedDueToMixedVersions, errors.Newf( "not all nodes are running the latest version yet (saw %s and %s)", redact.Safe(newVersion), redact.Safe(version)) } } if newVersion == "" { - return upgradeBlockedDueToError, errors.Errorf("no live nodes found") + return UpgradeBlockedDueToError, errors.Errorf("no live nodes found") } // Check if we really need to upgrade cluster version. if newVersion == clusterVersion { - return upgradeAlreadyCompleted, nil + return UpgradeAlreadyCompleted, nil } if notRunningErr != nil { - return upgradeBlockedDueToError, notRunningErr - } - - // Check if auto upgrade is enabled at current version. This is read from - // the KV store so that it's in effect on all nodes immediately following a - // SET CLUSTER SETTING. - row, err := s.sqlServer.internalExecutor.QueryRowEx( - ctx, "read-downgrade", nil, /* txn */ - sessiondata.RootUserSessionDataOverride, - "SELECT value FROM system.settings WHERE name = 'cluster.preserve_downgrade_option';", - ) - if err != nil { - return upgradeBlockedDueToError, err - } - - if row != nil { - downgradeVersion := string(tree.MustBeDString(row[0])) - - if clusterVersion == downgradeVersion { - return upgradeDisabledByConfiguration, nil - } + return UpgradeBlockedDueToError, notRunningErr } - return upgradeAllowed, nil + return s.isAutoUpgradeEnabled(clusterVersion), nil } // clusterVersion returns the current cluster version from the SQL subsystem diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6820e2dfb248..e567db52e431 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1740,6 +1740,12 @@ func (s *SQLServer) preStart( } })) + if !s.execCfg.Codec.ForSystemTenant() && (s.serviceMode != mtinfopb.ServiceModeExternal) { + if err := s.startTenantAutoUpgradeLoop(ctx); err != nil { + return errors.Wrap(err, "cannot start tenant auto upgrade checker task") + } + } + return nil } diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 6ee82759da3a..aebea079e168 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -646,3 +646,11 @@ func (s *SettingsWatcher) getSettingAndValue(key settings.InternalKey) (bool, kv }} return true, payload } + +func (s *SettingsWatcher) GetPreserveDowngradeVersionSettingValue() string { + return clusterversion.PreserveDowngradeVersion.Get(&s.settings.SV) +} + +func (s *SettingsWatcher) GetAutoUpgradeEnabledSettingValue() bool { + return clusterversion.AutoUpgradeEnabled.Get(&s.settings.SV) +} diff --git a/pkg/server/tenant_auto_upgrade.go b/pkg/server/tenant_auto_upgrade.go new file mode 100644 index 000000000000..f626694330ee --- /dev/null +++ b/pkg/server/tenant_auto_upgrade.go @@ -0,0 +1,224 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// startTenantAutoUpgradeLoop checks for changes in storage cluster version +// every 10 seconds and triggers an upgrade attempt if needed. Other than +// that, it also starts an upgrade attempt 10 seconds after a new sql server +// starts. This is to cover cases where upgrade becomes possible due to +// an upgrade to the tenant binary version. +func (s *SQLServer) startTenantAutoUpgradeLoop(ctx context.Context) error { + storageClusterVersion := s.settingsWatcher.GetStorageClusterActiveVersion().Version + return s.stopper.RunAsyncTask(ctx, "tenant-auto-upgrade-checker", func(ctx context.Context) { + firstAttempt := true + var allowUpgradeOnInternalVersionChanges bool + if k := s.cfg.TestingKnobs.Server; k != nil { + allowUpgradeOnInternalVersionChanges = k.(*TestingKnobs).AllowTenantAutoUpgradeOnInternalVersionChanges + } + for { + select { + case <-s.stopper.ShouldQuiesce(): + return + // Check for changes every 10 seconds to avoid triggering an upgrade + // on every change to the internal version of storage cluster version + // within a short time period. + case <-time.After(time.Second * 10): + latestStorageClusterVersion := s.settingsWatcher.GetStorageClusterActiveVersion().Version + // Only run upgrade if this is the first attempt (i.e. on server startup) or if the + // the storage cluster version changed and is at an Internal version of 0 which implies that + // that storage is at the "final" version for some release. First case ensures that if an upgrade is + // possible due to a change in a sql instance binary version, it happens. Second + // cases ensures that if an upgrade is possible due to a change in the storage + // cluster version, it happens. + // We may run an attempt when the change is only to the Internal version if a testing knob + // is passed. + storageClusterVersionChanged := storageClusterVersion != latestStorageClusterVersion + if firstAttempt || + (storageClusterVersionChanged && (storageClusterVersion.Internal == 0 || allowUpgradeOnInternalVersionChanges)) { + firstAttempt = false + storageClusterVersion = latestStorageClusterVersion + if err := s.startAttemptTenantUpgrade(ctx, allowUpgradeOnInternalVersionChanges); err != nil { + log.Errorf(ctx, "failed to start an upgrade attempt: %v", err) + } + } + } + } + }) +} + +// startAttemptTenantUpgrade attempts to upgrade cluster version. +func (s *SQLServer) startAttemptTenantUpgrade( + ctx context.Context, allowUpgradeOnInternalVersionChanges bool, +) error { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() + + // Check if auto upgrade is disabled for test purposes. + if k := s.cfg.TestingKnobs.Server; k != nil { + upgradeTestingKnobs := k.(*TestingKnobs) + if disableCh := upgradeTestingKnobs.DisableAutomaticVersionUpgrade; disableCh != nil { + log.Infof(ctx, "auto upgrade disabled by testing") + select { + case <-disableCh: + log.Infof(ctx, "auto upgrade no longer disabled by testing") + case <-s.stopper.ShouldQuiesce(): + return nil + } + } + } + + var tenantAutoUpgradeInfoCh chan struct { + Status int + UpgradeTo roachpb.Version + } + // Get testing knobs if set. + if k := s.cfg.TestingKnobs.Server; k != nil { + upgradeTestingKnobs := k.(*TestingKnobs) + tenantAutoUpgradeInfoCh = upgradeTestingKnobs.TenantAutoUpgradeInfo + } + + var tenantClusterVersion clusterversion.ClusterVersion + if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + tenantClusterVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn) + return err + }); err != nil { + return errors.Wrap(err, "unable to retrieve tenant cluster version") + } + + // Check if we should upgrade cluster version. + status, upgradeToVersion, err := s.tenantUpgradeStatus(ctx, tenantClusterVersion.Version, allowUpgradeOnInternalVersionChanges) + + // Let test code know the status of an upgrade if needed. + if tenantAutoUpgradeInfoCh != nil { + tenantAutoUpgradeInfoCh <- struct { + Status int + UpgradeTo roachpb.Version + }{int(status), upgradeToVersion} + } + + switch status { + case UpgradeBlockedDueToError: + return err + case UpgradeDisabledByConfiguration: + log.Infof(ctx, "auto upgrade is disabled for current version (preserve_downgrade_option): %s", redact.Safe(tenantClusterVersion.Version)) + return nil + case UpgradeAlreadyCompleted: + log.Info(ctx, "no need to upgrade, instance already at the newest version") + return nil + case UpgradeBlockedDueToLowStorageClusterVersion: + log.Info(ctx, "upgrade blocked because storage binary version doesn't support upgrading to minimum tenant binary version") + return nil + case UpgradeAllowed: + // Fall out of the select below. + default: + return errors.AssertionFailedf("unhandled case: %d", status) + } + + upgradeRetryOpts := retry.Options{ + InitialBackoff: 5 * time.Second, + MaxBackoff: 10 * time.Second, + Multiplier: 2, + Closer: s.stopper.ShouldQuiesce(), + } + + // Run the set cluster setting version statement in a transaction + // until success. + for ur := retry.StartWithCtx(ctx, upgradeRetryOpts); ur.Next(); { + if _, err := s.internalExecutor.ExecEx( + ctx, "set-version", nil, /* txn */ + sessiondata.RootUserSessionDataOverride, + "SET CLUSTER SETTING version = $1;", upgradeToVersion.String(), + ); err != nil { + return errors.Wrap(err, "error when finalizing tenant cluster version upgrade") + } else { + log.Infof(ctx, "successfully upgraded tenant cluster version to %v", upgradeToVersion) + return nil + } + } + return nil +} + +// tenantUpgradeStatus lets the main checking loop know if we should upgrade. +func (s *SQLServer) tenantUpgradeStatus( + ctx context.Context, + currentClusterVersion roachpb.Version, + allowUpgradeOnInternalVersionChanges bool, +) (st upgradeStatus, upgradeToVersion roachpb.Version, err error) { + storageClusterVersion := s.settingsWatcher.GetStorageClusterActiveVersion().Version + + if autoUpgradeEnabled := s.settingsWatcher.GetAutoUpgradeEnabledSettingValue(); !autoUpgradeEnabled { + // Automatic upgrade is not enabled. + return UpgradeDisabledByConfiguration, roachpb.Version{}, nil + } + + instances, err := s.sqlInstanceReader.GetAllInstances(ctx) + if err != nil { + return UpgradeBlockedDueToError, roachpb.Version{}, err + } + if len(instances) == 0 { + return UpgradeBlockedDueToError, roachpb.Version{}, errors.Errorf("no live instances found") + } + log.Infof(ctx, "found %d instances", len(instances)) + + findMinBinaryVersion := func(instances []sqlinstance.InstanceInfo) roachpb.Version { + minVersion := instances[0].BinaryVersion + for _, instance := range instances { + if instance.BinaryVersion.Less(minVersion) { + minVersion = instance.BinaryVersion + } + } + if !allowUpgradeOnInternalVersionChanges { + // Unless a testing knob was passed, we are only interested in major and minor versions, not Internal ones. + minVersion.Internal = 0 + } + return minVersion + } + + // For all cases below, return upgradeBlockedDueToLowStorageClusterVersion and + // do not upgrade if storage logical version is less than the upgradeTo version. + // + // Upgrade Rules: + // 1. Upgrade completed if `Tenant Logical Version == min(instancesBinaryVersions...)` + // 2. Upgrade to Storage Logical Version (SLV) if min(instancesBinaryVersions...) supports upgrading to SLV + // 3. Upgrade to min(instancesBinaryVersions...) + + minInstanceBinaryVersion := findMinBinaryVersion(instances) + if currentClusterVersion == minInstanceBinaryVersion { + return UpgradeAlreadyCompleted, roachpb.Version{}, nil + } else if storageClusterVersion.LessEq(minInstanceBinaryVersion) { + // minInstanceBinaryVersion supports storageClusterVersion so upgrade to storageClusterVersion. + upgradeToVersion = storageClusterVersion + } else { + // minInstanceBinaryVersion doesn't support storageClusterVersion but we can upgrade + // cluster version to minInstanceBinaryVersion. + upgradeToVersion = minInstanceBinaryVersion + } + + if storageClusterVersion.Less(upgradeToVersion) { + return UpgradeBlockedDueToLowStorageClusterVersion, roachpb.Version{}, nil + } + return UpgradeAllowed, upgradeToVersion, nil +} diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index dcd77718bf28..9b866d7fbd23 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -165,6 +165,22 @@ type TestingKnobs struct { // DisableSettingsWatcher disables the watcher that monitors updates // to system.settings. DisableSettingsWatcher bool + + TenantAutoUpgradeInfo chan struct { + Status int + UpgradeTo roachpb.Version + } + + // As of September 2023, only `v23.1` and master support shared process tenants. `v23.2` is not + // cut yet so the difference between the current binary version on master and v23.1 is only in the + // Internal version (both are major=23 minor=1). We only trigger shared process tenant auto upgrade + // on changes to major/minor versions but since we can only start shared process tenants in `v23.1`, + // there will not be any change to major/minor versions when upgrading from `v23.1` to master and + // we won't be able to test this new feature. This testing knob allows `TestTenantAutoUpgrade` to + // auto upgrade on changes to the Internal version. + // // TODO(ahmad/healthy-pod): Remove this once `v23.2` is cut and update `TestTenantAutoUpgrade` + // to reflect the changes. + AllowTenantAutoUpgradeOnInternalVersionChanges bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.