Skip to content

Commit

Permalink
upgrade: use high priority txn's to update the cluster version
Browse files Browse the repository at this point in the history
Previously, it was possible for the leasing subsystem to starve out
attempts to set the cluster version during upgrades, since the leasing
subsystem uses high priority txn for renewals. To address this, this
patch makes the logic to set the cluster version high priority so
it can't be pushed out by lease renewals.

Fixes: #113908

Release note (bug fix): Addressed a bug that could cause cluster version
finalization to get starved out by descriptor lease renewals on larger
clusters.
  • Loading branch information
fqazi committed Nov 28, 2023
1 parent ad9104a commit 39cb48d
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Expand Up @@ -356,6 +356,7 @@ go_library(
"//pkg/server/autoconfig/acprovider",
"//pkg/server/pgurl",
"//pkg/server/serverpb",
"//pkg/server/settingswatcher",
"//pkg/server/status/statuspb",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down
70 changes: 49 additions & 21 deletions pkg/sql/set_cluster_setting.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -616,22 +617,49 @@ func setVersionSetting(
if err != nil {
return err
}
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Confirm if the version has actually changed on us.
datums, err := txn.QueryRowEx(
ctx, "retrieve-prev-setting", txn.KV(),
sessiondata.RootUserSessionDataOverride,
"SELECT value FROM system.settings WHERE name = $1", setting.InternalKey(),
)
return db.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// On complex clusters with a large number of descriptors (> 500) and
// multi-region nodes (> 9), normal priority transactions reading/updating
// the version row can be starved. This is due to the lease manager reading
// the version row at high priority, when refreshing leases (#95227), with
// a complex cluster this traffic will continuous.
// Run the version bump inside the upgrade as high priority, since
// lease manager ends up reading the version row (with high priority)
// inside the settings table when refreshing leases. On complex clusters
// (multi-region with high latency) or with a large number of descriptors
// ( >500) it's possible for normal transactions to be starved by continuous
// lease traffic.
// This is safe from deadlocks / starvation because we expected this
// transaction only do the following:
// 1) We expect this transaction to only read and write to the
// version key in the system.settings table. To achieve the smallest
// possible txn and avoid extra operations on other keys, we are going to
// use KV call with EncodeSettingKey/EncodeSettingValue functions
// instead of using the internal executor.
// 2) Reads from the system.sql_instances table to confirm all SQL servers
// have been upgraded in multi-tenant environments.
// 3) Other transactions will use a normal priority and get pushed out by
// this one, if they involve schema changes on the system database
// descriptor (highly unlikely).
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}

// Fetch the existing version setting and see if its
// been modified.
codec := db.(*InternalDB).server.cfg.Codec
decoder := settingswatcher.MakeRowDecoder(codec)
key := settingswatcher.EncodeSettingKey(codec, "version")
row, err := txn.Get(ctx, key)
if err != nil {
return err
}
if len(datums) > 0 {
dStr, ok := datums[0].(*tree.DString)
if !ok {
return errors.AssertionFailedf("existing version value is not a string, got %T", datums[0])
if row.Value != nil {
_, val, _, err := decoder.DecodeRow(roachpb.KeyValue{Key: row.Key, Value: *row.Value}, nil /* alloc */)
if err != nil {
return err
}
oldRawValue := []byte(string(*dStr))
oldRawValue := []byte(val.Value)
if bytes.Equal(oldRawValue, rawValue) {
return nil
}
Expand All @@ -645,22 +673,22 @@ func setVersionSetting(
return nil
}
}
// Only if the version has increased, alter the setting.
if _, err = txn.ExecEx(
ctx, "update-setting", txn.KV(),
sessiondata.RootUserSessionDataOverride,
`UPSERT INTO system.settings (name, value, "lastUpdated", "valueType") VALUES ($1, $2, now(), $3)`,
setting.InternalKey(), string(rawValue), setting.Typ(),
); err != nil {
// Encode the setting value to write out the updated version.
var tuple []byte
if tuple, err = settingswatcher.EncodeSettingValue(rawValue, setting.Typ()); err != nil {
return err
}
newValue := &roachpb.Value{}
newValue.SetTuple(tuple)
if err := txn.Put(ctx, row.Key, newValue); err != nil {
return err
}

// Perform any necessary post-setting validation. This is used in
// the tenant upgrade interlock to ensure that the set of sql
// servers present at the time of the settings update, matches the
// set that was present when the fence bump occurred (see comment in
// upgrademanager.Migrate() for more details).
if err = postSettingValidate(ctx, txn.KV()); err != nil {
if err = postSettingValidate(ctx, txn); err != nil {
return err
}
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/upgrade/upgrades/BUILD.bazel
Expand Up @@ -114,6 +114,7 @@ go_test(
"system_exec_insights_test.go",
"system_rbr_indexes_test.go",
"upgrades_test.go",
"version_starvation_test.go",
],
args = ["-test.timeout=895s"],
data = glob(["testdata/**"]),
Expand Down Expand Up @@ -168,6 +169,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
Expand Down
119 changes: 119 additions & 0 deletions pkg/upgrade/upgrades/version_starvation_test.go
@@ -0,0 +1,119 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package upgrades_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server"
clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestLeasingClusterVersionStarvation validates that setting
// the cluster version is done with a high priority txn and cannot
// be pushed out. Previously, this would be normal priority and
// get pushed by the leasing code, leading to starvation
// when leases were acquired with sufficiently high frequency
// Note: This test just confirms its not normal priority by checking
// if it can push other txns.
func TestLeasingClusterVersionStarvation(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

routineChan := make(chan error)
waitToStartBump := make(chan struct{})
resumeBump := make(chan struct{})
clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
UpgradeManager: &upgradebase.TestingKnobs{
InterlockPausePoint: upgradebase.AfterVersionBumpRPC,
InterlockReachedPausePointChannel: &waitToStartBump,
InterlockResumeChannel: &resumeBump,
},
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.V23_1),
},
},
},
}

// Disable lease renewals intentionally, so that we validate
// no deadlock risk exists with the settings table.
st := clustersettings.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false)

clusterArgs.ServerArgs.Settings = st

tc := testcluster.StartTestCluster(t, 1, clusterArgs)
lease.LeaseDuration.Override(ctx, &st.SV, 0)
lease.LeaseRenewalDuration.Override(ctx, &st.SV, 0)

defer tc.Stopper().Stop(ctx)
db := tc.ServerConn(0)
defer db.Close()

proceedWithCommit := make(chan struct{})
// Start a background transaction that will have an intent
// on the version key inside the settings table, with a
// normal priority (which should get pushed by the upgrade).
go func() {
<-waitToStartBump
tx, err := db.Begin()
if err != nil {
routineChan <- err
return
}
_, err = tx.Exec("SELECT name from system.settings where name='version' FOR UPDATE")
if err != nil {
routineChan <- err
return
}
resumeBump <- struct{}{}
for retry := retry.Start(retry.Options{}); retry.Next(); {
_, err = tx.Exec("SELECT name from system.settings where name='version' FOR UPDATE")
if err != nil {
rollbackErr := tx.Rollback()
routineChan <- errors.WithSecondaryError(err, rollbackErr)
return
}
}
}()

upgrades.Upgrade(
t,
db,
clusterversion.V23_2,
nil,
false,
)

// Our txn should have been pushed by the upgrade,
// which has a higher txn priority.
close(proceedWithCommit)
require.ErrorContainsf(t, <-routineChan, "pq: restart transaction: TransactionRetryWithProtoRefreshError:",
"upgrade was not able to push transaction")
}

0 comments on commit 39cb48d

Please sign in to comment.