Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

base,kvserver,server: configuration of provisioned bandwidth for a store #86063

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<tr><td><code>kv.snapshot_delegation.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.store.admission.provisioned_bandwidth</code></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.reject_over_max_intents_budget.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td></tr>
Expand Down
105 changes: 104 additions & 1 deletion pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,86 @@ func (ss *SizeSpec) Set(value string) error {
return nil
}

// ProvisionedRateSpec is an optional part of the StoreSpec.
//
// TODO(sumeer): We should map the file path specified in the store spec to
// the disk name. df can be used to map paths to names like /dev/nvme1n1 and
// /dev/sdb (these examples are from AWS EBS and GCP PD respectively) and the
// corresponding names produced by disk_counters.go are nvme1n1 and sdb
// respectively. We need to find or write a platform independent library --
// see the discussion on
// https://github.com/cockroachdb/cockroach/pull/86063#pullrequestreview-1074487018.
// With that change, the ProvisionedRateSpec would only be needed to override
// the cluster setting when there are heterogenous bandwidth limits in a
// cluster (there would be no more DiskName field).
type ProvisionedRateSpec struct {
// DiskName is the name of the disk observed by the code in disk_counters.go
// when retrieving stats for this store.
DiskName string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this DiskName field, I think. Looking at the top-level StoreSpec, we already have a Path to uniquely identify the disk device we're using. Let's continue using that; it's a required field unless using type=mem, which this doesn't apply to anyway. I'm looking at this test for ex:

{"/mnt/hda1", "", StoreSpec{Path: "/mnt/hda1"}},

This also makes the flag syntax easier (we can drop the name argument).

// ProvisionedBandwidth is the bandwidth provisioned for this store in
// bytes/s.
ProvisionedBandwidth int64
}

func newStoreProvisionedRateSpec(
field redact.SafeString, value string,
) (ProvisionedRateSpec, error) {
var spec ProvisionedRateSpec
used := make(map[string]struct{})
for _, split := range strings.Split(value, ":") {
if len(split) == 0 {
continue
}
subSplits := strings.Split(split, "=")
if len(subSplits) != 2 {
return ProvisionedRateSpec{}, errors.Errorf("%s field has invalid value %s", field, value)
}
subField := subSplits[0]
subValue := subSplits[1]
if _, ok := used[subField]; ok {
return ProvisionedRateSpec{}, errors.Errorf("%s field has duplicate sub-field %s",
field, subField)
}
used[subField] = struct{}{}
if len(subField) == 0 {
continue
}
if len(subValue) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field has no value specified for sub-field %s", field, subField)
}
switch subField {
case "disk-name":
spec.DiskName = subValue
case "bandwidth":
if len(subValue) <= 2 || subValue[len(subValue)-2:] != "/s" {
return ProvisionedRateSpec{},
errors.Errorf("%s field does not have bandwidth sub-field %s ending in /s",
field, subValue)
}
subValue = subValue[:len(subValue)-2]
var err error
spec.ProvisionedBandwidth, err = humanizeutil.ParseBytes(subValue)
if err != nil {
return ProvisionedRateSpec{},
errors.Wrapf(err, "could not parse bandwidth in field %s", field)
}
if spec.ProvisionedBandwidth == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field is trying to set bandwidth to 0", field)
}
default:
return ProvisionedRateSpec{}, errors.Errorf("%s field has unknown sub-field %s",
field, subField)
}
}
if len(spec.DiskName) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field did not specify disk-name", field)
}
return spec, nil
}

// StoreSpec contains the details that can be specified in the cli pertaining
// to the --store flag.
type StoreSpec struct {
Expand Down Expand Up @@ -189,6 +269,8 @@ type StoreSpec struct {
// through to C CCL code to set up encryption-at-rest. Must be set if and
// only if encryption is enabled, otherwise left empty.
EncryptionOptions []byte
// ProvisionedRateSpec is optional.
ProvisionedRateSpec ProvisionedRateSpec
}

// String returns a fully parsable version of the store spec.
Expand Down Expand Up @@ -231,6 +313,16 @@ func (ss StoreSpec) String() string {
fmt.Fprint(&buffer, optsStr)
fmt.Fprint(&buffer, ",")
}
if len(ss.ProvisionedRateSpec.DiskName) > 0 {
fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s",
ss.ProvisionedRateSpec.DiskName)
if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
fmt.Fprintf(&buffer, ":bandwidth=%s/s,",
humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth))
} else {
fmt.Fprintf(&buffer, ",")
}
}
// Trim the extra comma from the end if it exists.
if l := buffer.Len(); l > 0 {
buffer.Truncate(l - 1)
Expand Down Expand Up @@ -259,7 +351,7 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-

// NewStoreSpec parses the string passed into a --store flag and returns a
// StoreSpec if it is correctly parsed.
// There are four possible fields that can be passed in, comma separated:
// There are five possible fields that can be passed in, comma separated:
// - path=xxx The directory in which to the rocks db instance should be
// located, required unless using a in memory storage.
// - type=mem This specifies that the store is an in memory storage instead of
Expand All @@ -273,6 +365,10 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// - 20% -> 20% of the available space
// - 0.2 -> 20% of the available space
// - attrs=xxx:yyy:zzz A colon separated list of optional attributes.
// - provisioned-rate=disk-name=<disk-name>[:bandwidth=<bandwidth-bytes/s>] The
// provisioned-rate can be used for admission control for operations on the
// store. The bandwidth is optional, and if unspecified, a cluster setting
// (kv.store.admission.provisioned_bandwidth) will be used.
// Note that commas are forbidden within any field name or value.
func NewStoreSpec(value string) (StoreSpec, error) {
const pathField = "path"
Expand Down Expand Up @@ -399,6 +495,13 @@ func NewStoreSpec(value string) (StoreSpec, error) {
return StoreSpec{}, err
}
ss.PebbleOptions = buf.String()
case "provisioned-rate":
rateSpec, err := newStoreProvisionedRateSpec("provisioned-rate", value)
if err != nil {
return StoreSpec{}, err
}
ss.ProvisionedRateSpec = rateSpec

default:
return StoreSpec{}, fmt.Errorf("%s is not a valid store field", field)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/base/store_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ target_file_size=2097152`
{"path=/mnt/hda1,type=other", "other is not a valid store type", StoreSpec{}},
{"path=/mnt/hda1,type=mem,size=20GiB", "path specified for in memory store", StoreSpec{}},

// provisioned rate
{"path=/mnt/hda1,provisioned-rate=disk-name=nvme1n1:bandwidth=200MiB/s", "",
StoreSpec{Path: "/mnt/hda1", ProvisionedRateSpec: base.ProvisionedRateSpec{
DiskName: "nvme1n1", ProvisionedBandwidth: 200 << 20}}},
{"path=/mnt/hda1,provisioned-rate=disk-name=sdb", "", StoreSpec{
Path: "/mnt/hda1", ProvisionedRateSpec: base.ProvisionedRateSpec{
DiskName: "sdb", ProvisionedBandwidth: 0}}},

// RocksDB
{"path=/,rocksdb=key1=val1;key2=val2", "", StoreSpec{Path: "/", RocksDBOptions: "key1=val1;key2=val2"}},

Expand Down
11 changes: 11 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,17 @@ memory that the store may consume, for example:
--store=type=mem,size=20GiB
--store=type=mem,size=90%

</PRE>
Optionally, to configure admission control enforcement to prevent disk
bandwidth saturation, the "provisioned-rate" field can be specified with
the "disk-name" and an optional "bandwidth". The bandwidth is used to override
the value of the cluster setting, kv.store.admission.provisioned_bandwidth.
For example:
<PRE>

--store=provisioned-rate=disk-name=nvme1n1
--store=provisioned-rate=disk-name=sdb:bandwidth=250MiB/s

</PRE>
Commas are forbidden in all values, since they are used to separate fields.
Also, if you use equal signs in the file path to a store, you must use the
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3964,3 +3964,11 @@ func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes(
storeAdmissionQ.BypassedWorkDone(
followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo)
}

// ProvisionedBandwidthForAdmissionControl set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidthForAdmissionControl = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kv.store.admission.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ go_test(
"//pkg/server/diagnostics",
"//pkg/server/diagnostics/diagnosticspb",
"//pkg/server/serverpb",
"//pkg/server/status",
"//pkg/server/status/statuspb",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down Expand Up @@ -446,6 +447,7 @@ go_test(
"//pkg/upgrade",
"//pkg/upgrade/upgrades",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/envutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
Expand Down
83 changes: 82 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ type Node struct {
// COCKROACH_DEBUG_TS_IMPORT_FILE env var.
suppressNodeStatus syncutil.AtomicBool

diskStatsMap diskStatsMap

testingErrorEvent func(context.Context, *roachpb.BatchRequest, error)
}

Expand Down Expand Up @@ -772,17 +774,96 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh
s.UpdateIOThreshold(threshold)
}

// diskStatsMap encapsulates all the logic for populating DiskStats for
// admission.StoreMetrics.
type diskStatsMap struct {
provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
diskNameToStoreID map[string]roachpb.StoreID
}

func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
ctx context.Context,
clusterProvisionedBandwidth int64,
diskStatsFunc func(context.Context) ([]status.DiskStats, error),
) (stats map[roachpb.StoreID]admission.DiskStats, err error) {
if dsm.empty() {
return stats, nil
}
diskStats, err := diskStatsFunc(ctx)
if err != nil {
return stats, err
}
stats = make(map[roachpb.StoreID]admission.DiskStats)
for id, spec := range dsm.provisionedRate {
s := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if spec.ProvisionedBandwidth > 0 {
s.ProvisionedBandwidth = spec.ProvisionedBandwidth
}
stats[id] = s
}
for i := range diskStats {
if id, ok := dsm.diskNameToStoreID[diskStats[i].Name]; ok {
s := stats[id]
s.BytesRead = uint64(diskStats[i].ReadBytes)
s.BytesWritten = uint64(diskStats[i].WriteBytes)
stats[id] = s
}
}
return stats, nil
}

func (dsm *diskStatsMap) empty() bool {
return len(dsm.provisionedRate) == 0
}

func (dsm *diskStatsMap) initDiskStatsMap(specs []base.StoreSpec, engines []storage.Engine) error {
*dsm = diskStatsMap{
provisionedRate: make(map[roachpb.StoreID]base.ProvisionedRateSpec),
diskNameToStoreID: make(map[string]roachpb.StoreID),
}
for i := range engines {
id, err := kvserver.ReadStoreIdent(context.Background(), engines[i])
if err != nil {
return err
}
if len(specs[i].ProvisionedRateSpec.DiskName) > 0 {
dsm.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
dsm.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID
}
}
return nil
}

func (n *Node) registerEnginesForDiskStatsMap(
specs []base.StoreSpec, engines []storage.Engine,
) error {
return n.diskStatsMap.initDiskStatsMap(specs, engines)
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get(
&n.storeCfg.Settings.SV)
storeIDToDiskStats, err := n.diskStatsMap.tryPopulateAdmissionDiskStats(
context.Background(), clusterProvisionedBandwidth, status.GetDiskCounters)
if err != nil {
log.Warningf(context.Background(), "%v",
errors.Wrapf(err, "unable to populate disk stats"))
}
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
im := store.Engine().GetInternalIntervalMetrics()
diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if s, ok := storeIDToDiskStats[store.StoreID()]; ok {
diskStats = s
}
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im})
InternalIntervalMetrics: im,
DiskStats: diskStats})
return nil
})
return metrics
Expand Down
Loading