From be1100852bfc8881fe04f095afffffee030553cd Mon Sep 17 00:00:00 2001 From: healthy-pod Date: Mon, 24 Apr 2023 19:51:07 +0300 Subject: [PATCH] pkg/server: support tenant auto-upgrade MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, tenant upgrades in UA required a user to issue a `SET CLUSTER SETTING version =` statement to finalize an upgrade. This UX is different from what we have in single-tenant SH/Dedicated deployments in that we have auto upgrade in the later that starts an attempt to finalize cluster version after every node startup incase the node was started with a new binary version that all nodes now support upgrading to. In UA, we have two differences: 1. What to upgrade? - In a multi-tenant deployment, the storage and sql layers are upgraded separately. - The storage layer upgrade finalization is still handled by the existing auto upgrade logic. - In this change, we ensure that the sql layer is also auto-upgraded when possible. 2. When to upgrade? - In a single-tenant deployment, all layers share the same binary version and cluster version. Hence, an upgrade attempt is only needed when a new node starts to ensure that the cluster is auto-upgraded if the new binary version supports an upgrade. - In a multi-tenant deployment, in addition to the condition above, the sql server upgrade is also constrained by the storage cluster version. It is possible for all SQL instances to have binary versions that support an upgrade but the upgrade will still be blocked by the storage cluster version if it’s equal to the current tenant cluster version. This code change does the following: 1. Adds logic to run a SQL server upgrade attempt (mostly adopted from the original auto upgrade code) within the following ordered constraints (previously we merged #98830 to make getting the binary versions of instances easier): - Ensure that upgrade is not blocked by `preserve_downgrade_option`. - Exit if tenant cluster version is equal to storage cluster version [upgrade already completed]. - Upgrade to storage cluster version if the binary version of all SQL instances supports that. - Exit if tenant cluster version is equal to the minimum instance binary version [upgrade blocked due to too low binary version]. - Upgrade to the minimum instance binary version. 2. Runs the logic above when a SQL server is started. - This covers the case where a SQL server binary upgrade allows for an upgrade to the tenant cluster version. 3. Checks for change in storage cluster version every 30 seconds and starts an upgrade attempt if it was changed. - This covers the case where the binary versions of all SQL instances allow for an upgrade but it’s blocked due to the storage cluster version. Release note: None Epic: CRDB-20860 --- .../kvccl/kvtenantccl/upgradeccl/BUILD.bazel | 3 + .../upgradeccl/tenant_upgrade_test.go | 122 ++++++++++- .../tenant_upgrade_test.go | 8 + pkg/server/BUILD.bazel | 1 + pkg/server/auto_upgrade.go | 1 + pkg/server/server_sql.go | 8 + pkg/server/tenant_auto_upgrade.go | 205 ++++++++++++++++++ pkg/sql/set_cluster_setting.go | 7 + 8 files changed, 352 insertions(+), 3 deletions(-) create mode 100644 pkg/server/tenant_auto_upgrade.go diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel index 491e6a9ab7cc..933f42c46488 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel @@ -8,6 +8,7 @@ go_test( "tenant_upgrade_test.go", ], args = ["-test.timeout=295s"], + shard_count = 3, tags = ["ccl_test"], deps = [ "//pkg/base", @@ -24,10 +25,12 @@ go_test( "//pkg/sql/sqlinstance/instancestorage", "//pkg/sql/sqlliveness/slinstance", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/upgrade", "//pkg/upgrade/upgradebase", + "//pkg/util", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go index 473bf64238de..adc927998402 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go @@ -25,16 +25,120 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/stretchr/testify/require" ) +func TestTenantAutoUpgrade(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, // initializeVersion + ) + // Initialize the version to the BinaryMinSupportedVersion. + require.NoError(t, clusterversion.Initialize(ctx, + clusterversion.TestingBinaryMinSupportedVersion, &settings.SV)) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + // Test validates tenant behavior. No need for the default test + // tenant. + DefaultTestTenant: base.TestTenantDisabled, + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + expectedInitialTenantVersion := clusterversion.TestingBinaryMinSupportedVersion + expectedFinalTenantVersion := clusterversion.TestingBinaryVersion + connectToTenant := func(t *testing.T, addr string) (_ *gosql.DB, cleanup func()) { + pgURL, cleanupPGUrl := sqlutils.PGUrl(t, addr, "Tenant", url.User(username.RootUser)) + tenantDB, err := gosql.Open("postgres", pgURL.String()) + require.NoError(t, err) + return tenantDB, func() { + tenantDB.Close() + cleanupPGUrl() + } + } + mkTenant := func(t *testing.T, id uint64) (tenantDB *gosql.DB, cleanup func()) { + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, // initializeVersion + ) + // Initialize the version to the minimum it could be. + require.NoError(t, clusterversion.Initialize(ctx, + expectedInitialTenantVersion, &settings.SV)) + tenantArgs := base.TestTenantArgs{ + TenantID: roachpb.MustMakeTenantID(id), + TestingKnobs: base.TestingKnobs{}, + Settings: settings, + } + tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs) + require.NoError(t, err) + tenantDB, cleanup = connectToTenant(t, tenant.SQLAddr()) + return tenantDB, cleanup + } + + // Create a tenant before upgrading anything and verify its version. + const initialTenantID = 10 + tenantDB, cleanup := mkTenant(t, initialTenantID) + tenantRunner := sqlutils.MakeSQLRunner(tenantDB) + + // Ensure that the tenant works. + tenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version", + [][]string{{expectedInitialTenantVersion.String()}}) + tenantRunner.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + tenantRunner.Exec(t, "INSERT INTO t VALUES (1), (2)") + + // Upgrade the host cluster. + sqlutils.MakeSQLRunner(tc.ServerConn(0)).Exec(t, + "SET CLUSTER SETTING version = $1", + clusterversion.TestingBinaryVersion.String()) + + // Ensure that the tenant still works. + tenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}}) + // Wait for the tenant to auto-finalize. + tenantRunner.SucceedsSoonDuration = time.Second * 90 + if skip.Stress() || util.RaceEnabled { + tenantRunner.SucceedsSoonDuration = time.Second * 180 + } + tenantRunner.CheckQueryResultsRetry(t, "SHOW CLUSTER SETTING version", + [][]string{{expectedFinalTenantVersion.String()}}) + + // Restart the tenant and ensure that the version is correct. + cleanup() + { + tenantServer, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{ + TenantID: roachpb.MustMakeTenantID(initialTenantID), + }) + require.NoError(t, err) + tenantDB, cleanup = connectToTenant(t, tenantServer.SQLAddr()) + defer cleanup() + tenantRunner = sqlutils.MakeSQLRunner(tenantDB) + } + tenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}}) + tenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version", + [][]string{{expectedFinalTenantVersion.String()}}) + +} + // TestTenantUpgrade exercises the case where a system tenant is in a // non-finalized version state and creates a tenant. The test ensures // that the newly created tenant begins in that same version. @@ -95,9 +199,13 @@ func TestTenantUpgrade(t *testing.T) { require.NoError(t, clusterversion.Initialize(ctx, expectedInitialTenantVersion, &settings.SV)) tenantArgs := base.TestTenantArgs{ - TenantID: roachpb.MustMakeTenantID(id), - TestingKnobs: base.TestingKnobs{}, - Settings: settings, + TenantID: roachpb.MustMakeTenantID(id), + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, + Settings: settings, } tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs) require.NoError(t, err) @@ -139,6 +247,11 @@ func TestTenantUpgrade(t *testing.T) { { tenantServer, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{ TenantID: roachpb.MustMakeTenantID(initialTenantID), + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, }) require.NoError(t, err) initialTenant, cleanup = connectToTenant(t, tenantServer.SQLAddr()) @@ -275,6 +388,9 @@ func TestTenantUpgradeFailure(t *testing.T) { SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, }, + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, UpgradeManager: &upgradebase.TestingKnobs{ DontUseJobs: true, RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) { diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/tenant_upgrade_test.go index 6789b36064fc..a44ca9f1697d 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/tenant_upgrade_test.go @@ -291,6 +291,9 @@ func TestTenantUpgradeInterlock(t *testing.T) { tenantArgs := base.TestTenantArgs{ TenantID: id, TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), UpgradeManager: &upgradebase.TestingKnobs{ InterlockPausePoint: test.pausePoint, @@ -424,6 +427,11 @@ func TestTenantUpgradeInterlock(t *testing.T) { Stopper: otherServerStopper, TenantID: tenantID, Settings: otherServerSettings, + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, }) var otherTenantRunner *sqlutils.SQLRunner diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index bd7685455b64..479fa7541cf2 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -75,6 +75,7 @@ go_library( "stop_trigger.go", "tcp_keepalive_manager.go", "tenant.go", + "tenant_auto_upgrade.go", "tenant_migration.go", "testing_knobs.go", "testserver.go", diff --git a/pkg/server/auto_upgrade.go b/pkg/server/auto_upgrade.go index c758d12f54b2..a22e4b14a402 100644 --- a/pkg/server/auto_upgrade.go +++ b/pkg/server/auto_upgrade.go @@ -116,6 +116,7 @@ const ( upgradeDisabledByConfiguration upgradeBlockedDueToError upgradeBlockedDueToMixedVersions + upgradeBlockedDueToLowBinaryVersion ) // upgradeStatus lets the main checking loop know if we should do upgrade, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index ea5825b13875..077c42c84fd7 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1698,6 +1698,14 @@ func (s *SQLServer) preStart( } })) + if err := s.startAttemptUpgrade(ctx); err != nil { + return errors.Wrap(err, "cannot start tenant auto upgrade task") + } + + if err := s.startAutoUpgradeOnStorageUpgrade(ctx); err != nil { + return errors.Wrap(err, "cannot start tenant auto upgrade checker task") + } + return nil } diff --git a/pkg/server/tenant_auto_upgrade.go b/pkg/server/tenant_auto_upgrade.go new file mode 100644 index 000000000000..0a2797432a88 --- /dev/null +++ b/pkg/server/tenant_auto_upgrade.go @@ -0,0 +1,205 @@ +// Copyright 2018 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// startAutoUpgradeOnStorageUpgrade checks for changes in storage cluster version +// every 30 seconds and triggers an upgrade attempt if needed. +func (s *SQLServer) startAutoUpgradeOnStorageUpgrade(ctx context.Context) error { + storageClusterVersion := s.settingsWatcher.GetStorageClusterActiveVersion().Version + return s.stopper.RunAsyncTask(ctx, "tenant-auto-upgrade-checker", func(ctx context.Context) { + for { + select { + case <-s.stopper.ShouldQuiesce(): + return + // Check for changes every 30 seconds to avoid triggering an upgrade + // on every change to the internal version of storage cluster version + // within a short time period. + case <-time.After(time.Second * 30): + latestStorageClusterVersion := s.settingsWatcher.GetStorageClusterActiveVersion().Version + if storageClusterVersion != latestStorageClusterVersion { + storageClusterVersion = latestStorageClusterVersion + if err := s.startAttemptUpgrade(ctx); err != nil { + log.Errorf(ctx, "failed to start an upgrade attempt") + } + } + } + } + }) +} + +// startAttemptUpgrade attempts to upgrade cluster version. +func (s *SQLServer) startAttemptUpgrade(ctx context.Context) error { + return s.stopper.RunAsyncTask(ctx, "tenant-auto-upgrade", func(ctx context.Context) { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() + + retryOpts := retry.Options{ + InitialBackoff: time.Second, + MaxBackoff: 30 * time.Second, + Multiplier: 2, + Closer: s.stopper.ShouldQuiesce(), + } + + // Check if auto upgrade is disabled for test purposes. + if k := s.cfg.TestingKnobs.Server; k != nil { + upgradeTestingKnobs := k.(*TestingKnobs) + if disableCh := upgradeTestingKnobs.DisableAutomaticVersionUpgrade; disableCh != nil { + log.Infof(ctx, "auto upgrade disabled by testing") + select { + case <-disableCh: + log.Infof(ctx, "auto upgrade no longer disabled by testing") + case <-s.stopper.ShouldQuiesce(): + return + } + } + } + + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + var tenantClusterVersion clusterversion.ClusterVersion + if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + tenantClusterVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn) + return err + }); err != nil { + log.Errorf(ctx, "unable to retrieve cluster version: %v", err) + continue + } + + // Check if we should upgrade cluster version, keep checking upgrade + // status, or stop attempting upgrade. + status, upgradeToVersion, err := s.upgradeStatus(ctx, tenantClusterVersion.Version) + switch status { + case upgradeBlockedDueToError: + log.Errorf(ctx, "failed attempt to upgrade cluster version, error: %v", err) + continue + case upgradeBlockedDueToMixedVersions: + log.Infof(ctx, "failed attempt to upgrade cluster version: %v", err) + continue + case upgradeDisabledByConfiguration: + log.Infof(ctx, "auto upgrade is disabled for current version (preserve_downgrade_option): %s", redact.Safe(tenantClusterVersion.Version)) + // Note: we do 'continue' here (and not 'return') so that the + // auto-upgrade gets a chance to continue/complete if the + // operator resets `preserve_downgrade_option` after the node + // has started up already. + continue + case upgradeAlreadyCompleted: + log.Info(ctx, "no need to upgrade, instance already at the newest version") + return + case upgradeBlockedDueToLowBinaryVersion: + log.Info(ctx, "upgrade blocked because tenant binary version doesn't support upgrading to storage cluster version") + case upgradeAllowed: + // Fall out of the select below. + default: + panic(errors.AssertionFailedf("unhandled case: %d", status)) + } + + upgradeRetryOpts := retry.Options{ + InitialBackoff: 5 * time.Second, + MaxBackoff: 10 * time.Second, + Multiplier: 2, + Closer: s.stopper.ShouldQuiesce(), + } + + // Run the set cluster setting version statement in a transaction + // until success. + for ur := retry.StartWithCtx(ctx, upgradeRetryOpts); ur.Next(); { + if _, err := s.internalExecutor.ExecEx( + ctx, "set-version", nil, /* txn */ + sessiondata.RootUserSessionDataOverride, + "SET CLUSTER SETTING version = $1;", upgradeToVersion.String(), + ); err != nil { + log.Errorf(ctx, "error when finalizing cluster version upgrade: %v", err) + } else { + log.Info(ctx, "successfully upgraded cluster version") + return + } + } + } + }) +} + +// upgradeStatus lets the main checking loop know if we should do upgrade, +// keep checking upgrade status, or stop attempting upgrade. +func (s *SQLServer) upgradeStatus( + ctx context.Context, currentClusterVersion roachpb.Version, +) (st upgradeStatus, upgradeToVersion roachpb.Version, err error) { + storageClusterVersion := s.settingsWatcher.GetStorageClusterActiveVersion().Version + + // Check if auto upgrade is enabled at current version. + row, err := s.internalExecutor.QueryRowEx( + ctx, "read-downgrade", nil, /* txn */ + sessiondata.RootUserSessionDataOverride, + "SELECT value FROM system.settings WHERE name = 'cluster.preserve_downgrade_option';", + ) + if err != nil { + return upgradeBlockedDueToError, roachpb.Version{}, err + } + + if row != nil { + downgradeVersion, err := roachpb.ParseVersion(string(tree.MustBeDString(row[0]))) + if err != nil { + return upgradeBlockedDueToError, roachpb.Version{}, err + } + if currentClusterVersion == downgradeVersion { + return upgradeDisabledByConfiguration, roachpb.Version{}, nil + } + } + + if storageClusterVersion == currentClusterVersion { + return upgradeAlreadyCompleted, roachpb.Version{}, nil + } + + instances, err := s.sqlInstanceReader.GetAllInstances(ctx) + if err != nil { + return upgradeBlockedDueToError, roachpb.Version{}, err + } + if len(instances) == 0 { + return upgradeBlockedDueToError, roachpb.Version{}, errors.Errorf("no live instances found") + } + + findMinBinaryVersion := func(instances []sqlinstance.InstanceInfo) roachpb.Version { + minVersion := instances[0].BinaryVersion + for _, instance := range instances { + if instance.BinaryVersion.Less(minVersion) { + minVersion = instance.BinaryVersion + } + } + return minVersion + } + + minInstanceBinaryVersion := findMinBinaryVersion(instances) + if storageClusterVersion.Less(minInstanceBinaryVersion) || storageClusterVersion == minInstanceBinaryVersion { + // minInstanceBinaryVersion supports storageClusterVersion so upgrade to storageClusterVersion. + upgradeToVersion = storageClusterVersion + } else if currentClusterVersion.Less(minInstanceBinaryVersion) { + // minInstanceBinaryVersion doesn't support storageClusterVersion but we can upgrade + // cluster version to minInstanceBinaryVersion. + upgradeToVersion = minInstanceBinaryVersion + } else { + return upgradeBlockedDueToLowBinaryVersion, roachpb.Version{}, nil + } + return upgradeAllowed, upgradeToVersion, nil +} diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index 105b3047ed4e..79a098f0c5c8 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -49,11 +49,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/hintdetail" "github.com/cockroachdb/redact" ) +var setVersionSettingMu = &syncutil.Mutex{} + // setClusterSettingNode represents a SET CLUSTER SETTING statement. type setClusterSettingNode struct { name string @@ -469,6 +472,10 @@ func setVersionSetting( forSystemTenant bool, releaseLeases func(context.Context), ) error { + // setVersionSettingMu prevents data races when setting the + // version setting. + setVersionSettingMu.Lock() + defer setVersionSettingMu.Unlock() // In the special case of the 'version' cluster setting, // we must first read the previous value to validate that the // value change is valid.