Skip to content

Commit

Permalink
*: integrate admission control for SQL=>KV and KV=>SQL
Browse files Browse the repository at this point in the history
- Add roachpb.AdmissionHeader used for KV admission control.
- Initialize this for NewTxnWithSteppingEnabled that is used
  by SQL. Other paths will bypass KV admission control.
- Node.Batch does server-side admission control for KV.
- sql.tableWriterBase and row.txnKVFetcher do KV=>SQL
  response admission control since these are the shared
  read and write paths.
- Cluster settings to enable admission control.
- RegisterRunnableCountCallback is used to register for
  unsmoothed load information, as used in earlier
  experiments.
- The grant chaining in the admission package was causing
  the runnable count to be too low. Grant batching is
  introduced, that goes along with chaining. Additionally,
  we also immediately terminate and start a new grant chain,
  since the lag in asking for termination and waiting for
  it to happen was also preventing runnable counts from
  getting to a higher value.

These changes were evaluated using kv50/enc=false/nodes=1/conc=8192
which causes CPU overload in the absence of admission control:
CPU > 96%, runnable goroutines per cpu > 120, node heartbeat
latency > 1s, clock offset +- 100ms.
After turning on admission control, runnable goroutines
drop to < 40, and cpu utilization to 90-93%, node heartbeat
latency ~200ms, clock offset to +- 1ms.
Queuing is transferred to the WorkQueue, with p50 queueing
latency for KV work at 200ms and KV=>SQL response work
at 1.1s.
Since the CPU utilization decreases, there is an increase
in SQL execution latency since we are leaving more unused
CPU. Without further increasing the runnable count, which
would defeat the purpose of admission control, we cannot
get to a higher utilization. It is likely that this CPU
utilization decrease is more pronounced because of the
small units of work in the KV workoad

Release note (ops change): Node-level admission control that
considers the cpu resource is introduced for KV request
processing, and response processing (in SQL) for KV responses.
This admission control can be enabled using
admission.kv.enabled and admission.sql_kv_response.enabled.
  • Loading branch information
sumeerbhola committed May 24, 2021
1 parent 9377527 commit 8e1df2e
Show file tree
Hide file tree
Showing 29 changed files with 1,408 additions and 667 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Setting Type Default Description
admission.kv.enabled boolean false when true, work performed by the KV layer is subject to admission control
admission.sql_kv_response.enabled boolean false when true, work performed by the SQL layer when receiving a KV response is subject to admission control
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout duration 10m0s the timeout for import/export storage operations
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<table>
<thead><tr><th>Setting</th><th>Type</th><th>Default</th><th>Description</th></tr></thead>
<tbody>
<tr><td><code>admission.kv.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the KV layer is subject to admission control</td></tr>
<tr><td><code>admission.sql_kv_response.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the SQL layer when receiving a KV response is subject to admission control</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
<tr><td><code>cloudstorage.timeout</code></td><td>duration</td><td><code>10m0s</code></td><td>the timeout for import/export storage operations</td></tr>
Expand Down
28 changes: 19 additions & 9 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ func registerKV(r *testRegistry) {
// If true, the reads are limited reads over the full span of the table.
// Currently this also enables SFU writes on the workload since this is
// geared towards testing optimistic locking and latching.
spanReads bool
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
spanReads bool
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
concMultiplier int
duration time.Duration
tags []string
}
computeNumSplits := func(opts kvOptions) int {
// TODO(ajwerner): set this default to a more sane value or remove it and
Expand Down Expand Up @@ -79,7 +80,11 @@ func registerKV(r *testRegistry) {
t.Status("running workload")
m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
concurrency := ifLocal("", " --concurrency="+fmt.Sprint(nodes*64))
concurrencyMultiplier := 64
if opts.concMultiplier != 0 {
concurrencyMultiplier = opts.concMultiplier
}
concurrency := ifLocal("", " --concurrency="+fmt.Sprint(nodes*concurrencyMultiplier))

splits := " --splits=" + strconv.Itoa(computeNumSplits(opts))
if opts.duration == 0 {
Expand Down Expand Up @@ -126,6 +131,8 @@ func registerKV(r *testRegistry) {
for _, opts := range []kvOptions{
// Standard configs.
{nodes: 1, cpus: 8, readPercent: 0},
// CPU overload test, to stress admission control.
{nodes: 1, cpus: 8, readPercent: 50, concMultiplier: 8192, duration: 20 * time.Minute},
{nodes: 1, cpus: 8, readPercent: 95},
{nodes: 1, cpus: 32, readPercent: 0},
{nodes: 1, cpus: 32, readPercent: 95},
Expand Down Expand Up @@ -203,6 +210,9 @@ func registerKV(r *testRegistry) {
if opts.sequential {
nameParts = append(nameParts, "seq")
}
if opts.concMultiplier != 0 { // support legacy test name which didn't include this multiplier
nameParts = append(nameParts, fmt.Sprintf("conc=%d", opts.concMultiplier))
}

minVersion := "v2.0.0"
if opts.encryption {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/roachpb",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/util/admission",
"//pkg/util/contextutil",
"//pkg/util/duration",
"//pkg/util/hlc",
Expand All @@ -31,6 +32,7 @@ go_library(
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v2//:apd",
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ type Batch struct {
// The Header which will be used to send the resulting BatchRequest.
// To be modified directly.
Header roachpb.Header
reqs []roachpb.RequestUnion
// The AdmissionHeader which will be used when sending the resulting
// BatchRequest. To be modified directly.
AdmissionHeader roachpb.AdmissionHeader
reqs []roachpb.RequestUnion
// Set when AddRawRequest is used, in which case using the "other"
// operations renders the batch unusable.
raw bool
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -255,6 +256,12 @@ type DB struct {
ctx DBContext
// crs is the sender used for non-transactional requests.
crs CrossRangeTxnWrapperSender

// SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is
// placed here simply for plumbing convenience, as there is a diversity of
// SQL code that all uses kv.DB.
// TODO(sumeer): find a home for this in the SQL layer.
SQLKVResponseAdmissionQ *admission.WorkQueue
}

// NonTransactionalSender returns a Sender that can be used for sending
Expand Down Expand Up @@ -717,6 +724,7 @@ func sendAndFill(ctx context.Context, send SenderFunc, b *Batch) error {
var ba roachpb.BatchRequest
ba.Requests = b.reqs
ba.Header = b.Header
ba.AdmissionHeader = b.AdmissionHeader
b.response, b.pErr = send(ctx, ba)
b.fillResults(ctx)
if b.pErr == nil {
Expand Down
39 changes: 37 additions & 2 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -85,6 +87,12 @@ type Txn struct {
// deadline.
deadline *hlc.Timestamp
}

// admissionHeader is used for admission control for work done in this
// transaction. Only certain paths initialize this properly, and the
// remaining just use the zero value. The set of code paths that initialize
// this are expected to expand over time.
admissionHeader roachpb.AdmissionHeader
}

// NewTxn returns a new RootTxn.
Expand Down Expand Up @@ -122,9 +130,16 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
}

// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL.
// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL. Note
// that this initializes Txn.admissionHeader to specify that the source is
// FROM_SQL.
func NewTxnWithSteppingEnabled(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
txn.admissionHeader = roachpb.AdmissionHeader{
Priority: int32(admission.NormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
}
_ = txn.ConfigureStepping(ctx, SteppingEnabled)
return txn
}
Expand Down Expand Up @@ -371,7 +386,7 @@ func (txn *Txn) DisablePipelining() error {

// NewBatch creates and returns a new empty batch object for use with the Txn.
func (txn *Txn) NewBatch() *Batch {
return &Batch{txn: txn}
return &Batch{txn: txn, AdmissionHeader: txn.admissionHeader}
}

// Get retrieves the value for a key, returning the retrieved key/value or an
Expand Down Expand Up @@ -621,6 +636,10 @@ func (txn *Txn) Run(ctx context.Context, b *Batch) error {
}

func (txn *Txn) commit(ctx context.Context) error {
// A batch with only endTxnReq is not subject to admission control, in order
// to reduce contention by releasing locks. In multi-tenant settings, it
// will be subject to admission control, and the zero CreateTime will give
// it preference within the tenant.
var ba roachpb.BatchRequest
ba.Add(endTxnReq(true /* commit */, txn.deadline(), txn.systemConfigTrigger))
_, pErr := txn.Send(ctx, ba)
Expand Down Expand Up @@ -744,6 +763,10 @@ func (txn *Txn) rollback(ctx context.Context) *roachpb.Error {
// below. Note that this is the common path when a client disconnects in the
// middle of an open transaction or during statement execution.
if ctx.Err() == nil {
// A batch with only endTxnReq is not subject to admission control, in
// order to reduce contention by releasing locks. In multi-tenant
// settings, it will be subject to admission control, and the zero
// CreateTime will give it preference within the tenant.
var ba roachpb.BatchRequest
ba.Add(endTxnReq(false /* commit */, nil /* deadline */, false /* systemConfigTrigger */))
_, pErr := txn.Send(ctx, ba)
Expand All @@ -766,6 +789,10 @@ func (txn *Txn) rollback(ctx context.Context) *roachpb.Error {
ctx, cancel := stopper.WithCancelOnQuiesce(txn.db.AnnotateCtx(context.Background()))
if err := stopper.RunAsyncTask(ctx, "async-rollback", func(ctx context.Context) {
defer cancel()
// A batch with only endTxnReq is not subject to admission control, in
// order to reduce contention by releasing locks. In multi-tenant
// settings, it will be subject to admission control, and the zero
// CreateTime will give it preference within the tenant.
var ba roachpb.BatchRequest
ba.Add(endTxnReq(false /* commit */, nil /* deadline */, false /* systemConfigTrigger */))
_ = contextutil.RunWithTimeout(ctx, "async txn rollback", asyncRollbackTimeout,
Expand Down Expand Up @@ -944,6 +971,10 @@ func (txn *Txn) Send(
ba.Header.GatewayNodeID = txn.gatewayNodeID
}

// Some callers have not initialized ba using a Batch constructed using
// Txn.NewBatch. So we fallback to initializing here.
ba.AdmissionHeader = txn.admissionHeader

txn.mu.Lock()
requestTxnID := txn.mu.ID
sender := txn.mu.sender
Expand Down Expand Up @@ -1348,3 +1379,7 @@ func (txn *Txn) DeferCommitWait(ctx context.Context) func(context.Context) error
defer txn.mu.Unlock()
return txn.mu.sender.DeferCommitWait(ctx)
}

func (txn *Txn) AdmissionHeader() roachpb.AdmissionHeader {
return txn.admissionHeader
}
Loading

0 comments on commit 8e1df2e

Please sign in to comment.