Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: upgrade: use high priority txn's to update the cluster version #115034

Merged
merged 2 commits into from Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/server/settingswatcher/BUILD.bazel
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"overrides.go",
"row_decoder.go",
"setting_encoder.go",
"settings_watcher.go",
"version_guard.go",
],
Expand All @@ -22,6 +23,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
Expand All @@ -34,6 +36,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -43,6 +46,7 @@ go_test(
srcs = [
"main_test.go",
"row_decoder_external_test.go",
"setting_encoder_test.go",
"settings_watcher_external_test.go",
"version_guard_test.go",
],
Expand Down Expand Up @@ -75,6 +79,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
60 changes: 60 additions & 0 deletions pkg/server/settingswatcher/setting_encoder.go
@@ -0,0 +1,60 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package settingswatcher

import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// EncodeSettingKey encodes a key for the system.settings table, which
// can be used for direct KV operations.
func EncodeSettingKey(codec keys.SQLCodec, setting string) []byte {
indexPrefix := codec.IndexPrefix(keys.SettingsTableID, uint32(1))
return encoding.EncodeUvarintAscending(encoding.EncodeStringAscending(indexPrefix, setting), uint64(0))
}

// EncodeSettingValue encodes a value for the system.settings table, which
// can be used for direct KV operations.
func EncodeSettingValue(rawValue []byte, valueType string) ([]byte, error) {
// Encode the setting value to write out the updated version.
var tuple []byte
var err error
if tuple, err = valueside.Encode(tuple,
valueside.MakeColumnIDDelta(descpb.ColumnID(encoding.NoColumnID),
systemschema.SettingsTable.PublicColumns()[1].GetID()),
tree.NewDString(string(rawValue)),
nil); err != nil {
return nil, err
}
if tuple, err = valueside.Encode(tuple,
valueside.MakeColumnIDDelta(systemschema.SettingsTable.PublicColumns()[1].GetID(),
systemschema.SettingsTable.PublicColumns()[2].GetID()),
tree.MustMakeDTimestamp(timeutil.Now(), time.Microsecond),
nil); err != nil {
return nil, err
}
if tuple, err = valueside.Encode(tuple,
valueside.MakeColumnIDDelta(systemschema.SettingsTable.PublicColumns()[2].GetID(),
systemschema.SettingsTable.PublicColumns()[3].GetID()),
tree.NewDString(valueType),
nil); err != nil {
return nil, err
}
return tuple, nil
}
100 changes: 100 additions & 0 deletions pkg/server/settingswatcher/setting_encoder_test.go
@@ -0,0 +1,100 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package settingswatcher_test

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestSettingsEncoder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

tdb := sqlutils.MakeSQLRunner(db)
tdb.Exec(t, "SELECT 1")
codec := ts.Codec()

// Fetch the version from settings table, with a key
// we encoded our selves.
key := settingswatcher.EncodeSettingKey(codec, "version")
kv, err := ts.DB().Get(ctx, key)
require.NoError(t, err)

// Fetch the currently observed version from SQL.
currentVersion := tdb.QueryStr(t,
`
SELECT
value
FROM
system.settings
WHERE
name = 'version';
`)

// Ensure we read the version value back.
require.NoError(t, err)
decoder := settingswatcher.MakeRowDecoder(codec)
_, val, _, err := decoder.DecodeRow(roachpb.KeyValue{Key: kv.Key, Value: *kv.Value}, nil)
require.NoError(t, err)
var version clusterversion.ClusterVersion
require.NoError(t, protoutil.Unmarshal([]byte(val.Value), &version))
require.Equalf(t, []byte(val.Value), []byte(currentVersion[0][0]), "version did not match the expected value")

// Next set the version to an invalid one using pure KV calls
start := timeutil.Now()
version = clusterversion.ClusterVersion{Version: clusterversion.TestingBinaryMinSupportedVersion}
versionBytes, err := protoutil.Marshal(&version)
require.NoError(t, err)
newVal, err := settingswatcher.EncodeSettingValue(versionBytes, val.Type)
require.NoError(t, err)
kvVal := &roachpb.Value{}
kvVal.SetTuple(newVal)
err = ts.DB().Put(ctx, roachpb.Key(key), kvVal)
after := timeutil.Now()
require.NoError(t, err)
// Validate the current version is now the minimum.
currentVersion = tdb.QueryStr(t,
`
SELECT
value, "valueType"
FROM
system.settings
WHERE
name = 'version';
`)
require.Equalf(t, []byte(currentVersion[0][0]), versionBytes, "current version in settings doesn't match")
require.Equalf(t, currentVersion[0][1], "m", "setting type was lost")
// Validate the timestamp was set.
row := tdb.QueryRow(t, `SELECT "lastUpdated" from system.settings WHERE name='version'`)
var rowTime time.Time
row.Scan(&rowTime)
require.Greaterf(t, rowTime, start, "Time was less than expect")
require.Lessf(t, rowTime, after, "Time was greater than expect")
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Expand Up @@ -356,6 +356,7 @@ go_library(
"//pkg/server/autoconfig/acprovider",
"//pkg/server/pgurl",
"//pkg/server/serverpb",
"//pkg/server/settingswatcher",
"//pkg/server/status/statuspb",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down
70 changes: 49 additions & 21 deletions pkg/sql/set_cluster_setting.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -616,22 +617,49 @@ func setVersionSetting(
if err != nil {
return err
}
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Confirm if the version has actually changed on us.
datums, err := txn.QueryRowEx(
ctx, "retrieve-prev-setting", txn.KV(),
sessiondata.RootUserSessionDataOverride,
"SELECT value FROM system.settings WHERE name = $1", setting.InternalKey(),
)
return db.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// On complex clusters with a large number of descriptors (> 500) and
// multi-region nodes (> 9), normal priority transactions reading/updating
// the version row can be starved. This is due to the lease manager reading
// the version row at high priority, when refreshing leases (#95227), with
// a complex cluster this traffic will continuous.
// Run the version bump inside the upgrade as high priority, since
// lease manager ends up reading the version row (with high priority)
// inside the settings table when refreshing leases. On complex clusters
// (multi-region with high latency) or with a large number of descriptors
// ( >500) it's possible for normal transactions to be starved by continuous
// lease traffic.
// This is safe from deadlocks / starvation because we expected this
// transaction only do the following:
// 1) We expect this transaction to only read and write to the
// version key in the system.settings table. To achieve the smallest
// possible txn and avoid extra operations on other keys, we are going to
// use KV call with EncodeSettingKey/EncodeSettingValue functions
// instead of using the internal executor.
// 2) Reads from the system.sql_instances table to confirm all SQL servers
// have been upgraded in multi-tenant environments.
// 3) Other transactions will use a normal priority and get pushed out by
// this one, if they involve schema changes on the system database
// descriptor (highly unlikely).
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}

// Fetch the existing version setting and see if its
// been modified.
codec := db.(*InternalDB).server.cfg.Codec
decoder := settingswatcher.MakeRowDecoder(codec)
key := settingswatcher.EncodeSettingKey(codec, "version")
row, err := txn.Get(ctx, key)
if err != nil {
return err
}
if len(datums) > 0 {
dStr, ok := datums[0].(*tree.DString)
if !ok {
return errors.AssertionFailedf("existing version value is not a string, got %T", datums[0])
if row.Value != nil {
_, val, _, err := decoder.DecodeRow(roachpb.KeyValue{Key: row.Key, Value: *row.Value}, nil /* alloc */)
if err != nil {
return err
}
oldRawValue := []byte(string(*dStr))
oldRawValue := []byte(val.Value)
if bytes.Equal(oldRawValue, rawValue) {
return nil
}
Expand All @@ -645,22 +673,22 @@ func setVersionSetting(
return nil
}
}
// Only if the version has increased, alter the setting.
if _, err = txn.ExecEx(
ctx, "update-setting", txn.KV(),
sessiondata.RootUserSessionDataOverride,
`UPSERT INTO system.settings (name, value, "lastUpdated", "valueType") VALUES ($1, $2, now(), $3)`,
setting.InternalKey(), string(rawValue), setting.Typ(),
); err != nil {
// Encode the setting value to write out the updated version.
var tuple []byte
if tuple, err = settingswatcher.EncodeSettingValue(rawValue, setting.Typ()); err != nil {
return err
}
newValue := &roachpb.Value{}
newValue.SetTuple(tuple)
if err := txn.Put(ctx, row.Key, newValue); err != nil {
return err
}

// Perform any necessary post-setting validation. This is used in
// the tenant upgrade interlock to ensure that the set of sql
// servers present at the time of the settings update, matches the
// set that was present when the fence bump occurred (see comment in
// upgrademanager.Migrate() for more details).
if err = postSettingValidate(ctx, txn.KV()); err != nil {
if err = postSettingValidate(ctx, txn); err != nil {
return err
}
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/upgrade/upgrades/BUILD.bazel
Expand Up @@ -114,6 +114,7 @@ go_test(
"system_exec_insights_test.go",
"system_rbr_indexes_test.go",
"upgrades_test.go",
"version_starvation_test.go",
],
args = ["-test.timeout=895s"],
data = glob(["testdata/**"]),
Expand Down Expand Up @@ -168,6 +169,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
Expand Down