Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission: priority inversion caused by SQLKVResponseWork and SQLSQLResponseWork #85471

Open
sumeerbhola opened this issue Aug 2, 2022 · 12 comments
Labels
A-admission-control C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) O-support Originated from a customer P-3 Issues/test failures with no fix SLA T-admission-control Admission Control
Projects

Comments

@sumeerbhola
Copy link
Collaborator

sumeerbhola commented Aug 2, 2022

Admission control handles admission into the SQL layer, via SQLKVResponseWork and SQLSQLResponseWork. Due to the inability to estimate how many CPU tokens to give to such work, admission control sets up a hierarchy where such work is not admitted while there is KVWork waiting. This can cause unfairness and other issues, since remotely submitted low priority KVWork can prevent allocation of tokens to locally originating SQLKVResponseWork.

The current behavior is a side-effect of integrating admission control into our volcano-like iterator model. Let’s start with SQLKVResponseWork. This is called in kvstreamer.workerCoordinator.performRequestAsync, tableWriterBase, txnKVFetcher.fetch. We don't know when the processing of the []Result is complete in the tree of colexecop.Operators that works with this []Result. If we had a completion indicator we could convert this CPU-bound SQLKVResponseWork (and SQLSQLResponseWork) to use admission “slots”, and have KV and SQL share from the same slot pool. Also, we have recently started considering cpu-nano tokens for some elastic kv-work admission, and the same problem applies in a token-based design: we need to know when the goroutine(s) processing the []Result is done doing that processing and can tell admission control how many tokens it used.

This is a summary of the more detailed discussion in https://cockroachlabs.slack.com/archives/C01SRKWGHG8/p1657286381660829

@irfansharif @yuzefovich

Jira issue: CRDB-18259

Epic CRDB-25469

@DrewKimball
Copy link
Collaborator

Still trying to wrap my head around admission control, but I have a question prompted by the work to measure SQL CPU time - would this story be improved by measuring the time to process each SQL batch? Instead of scheduling admission of work into the SQL layer / across RPC boundaries, we could cooperatively schedule processing for each batch by a SQL operator, aiming for some target duration (e.g. 20ms). Could that be sufficient to have KV and SQL share a slot pool?

@sumeerbhola
Copy link
Collaborator Author

Can you point me to the SQL instrumentation for measuring CPU time. It is possible I have been thinking about this wrong and assuming we need to intercept the start and end of the SQL work in the places where SQL and KV interact (assuming you have read the slack thread linked above). We just want to intercept inside SQL wherever there is the potential for significant work.

The interception will take the form of asking for permission to do some CPU work and then informing when the work is done (so we can return slots and account for cpu consumed).

  • When asking for permission we need some information about the txn (tenant, priority, create time) for ordering the work.
  • The goroutine asking for permission should be tolerant to getting blocked (and have a context that it can stop blocking if cancelled).
  • The interception cannot be too fine grained, since there is some overhead involved. How large is a SQL batch and do you know how that roughly compares to the amount of data returned by a typical KV BatchResponse?

@DrewKimball
Copy link
Collaborator

We have a batchInfoCollector operator which implements the Operator interface and wraps another arbitrary Operator. Whenever a batch is requested, it collects statistics (like CPU time) while its input returns the batch:

// Next is part of the colexecop.Operator interface.
func (bic *batchInfoCollector) Next() coldata.Batch {
bic.stopwatch.Start()
// Wrap the call to Next() with a panic catcher in order to get the correct
// execution time (e.g. in the statement bundle).
err := colexecerror.CatchVectorizedRuntimeError(bic.next)
bic.stopwatch.Stop()
if err != nil {
colexecerror.InternalError(err)
}
if bic.batch.Length() > 0 {
bic.mu.Lock()
bic.mu.numBatches++
bic.mu.numTuples += uint64(bic.batch.Length())
bic.mu.Unlock()
}
return bic.batch
}
.
Currently this is only done for some queries when statistics are being collected, but you could imagine a version of this that is "always on". We'd want to run some experiments to ensure it doesn't add significant overhead, but I think it would be workable.

When asking for permission we need some information about the txn (tenant, priority, create time) for ordering the work.

This seems like it could be handled easily enough by storing the information on whatever operator implements the behavior. I expect we'd have everything needed during operator creation time (or could easily plumb it).

The goroutine asking for permission should be tolerant to getting blocked (and have a context that it can stop blocking if cancelled).

I think this is already the case, but +cc @yuzefovich.

The interception cannot be too fine grained, since there is some overhead involved. How large is a SQL batch and do you know how that roughly compares to the amount of data returned by a typical KV BatchResponse?

SQL batches are 1024 SQL rows by default. AFAIK KV batch sizes are up to 100,000 keys, and so may end up being larger. For a prototype the easiest thing would probably be to hook into admission control on a batch basis, but with a little extra work we could do it more or less than once per batch. Since we'd be measuring CPU time anyway, we could wait to intercept until the usage reached some limit (say, 20ms) to keep the overhead small. I think some of the elastic CPU stuff already works this way, right?

@yuzefovich
Copy link
Member

yuzefovich commented Feb 22, 2023

The goroutine asking for permission should be tolerant to getting blocked (and have a context that it can stop blocking if cancelled).

I think this is already the case

Yes, all SQL operators have a relevant context.Context in scope and can block for as long as necessary.

I agree with Drew that we should be able to plan additional "admission" operators into the tree (with low overhead) that would be responsible for providing hooks into the admission control system. E.g. if we have a query that involves a scan followed by a sort, we'd get a tree of operators like BatchFlowCoordinator -> admissionOp (1) -> sortOp -> admissionOp (2) -> scanOp. These admission operators would also have links to their child admission ops (in this example admissionOp (1) has a reference to admissionOp (2)). These links will allow us to calculate the CPU work of the wrapped operator precisely (since the CPU work tracked by admissionOp (1) would include both the sort and the scan, so we need to subtract the CPU work tracked by admissionOp (2)).

IIUC the main difficulty is the estimation of how CPU intensive particular SQL work will be prior to admitting it. In the KV land we have different parameters (like number of requests in BatchRequest) to use for the estimate, but in the SQL land we'd need to come up with a costing scheme for all of the operators which might be the wrong way to integrate the admission control. Instead, perhaps we would use an integration like this one:

func (o *admissionOp) Next() coldata.Batch {
  // cpuUsage is an estimate on the first call and is precise CPU usage of the previous call.
  cpuUsage := o.getCPUEstimate()
  admissionToken, err := o.admission.Admit(o.Ctx, cpuUsage)
  if err != nil {
    colexecerror.InternalError(err)
  }
  b := o.input.Next()
  admissionToken.Release()
  // Do precise CPU measurement for the current call.
  ...
  return b
}

and the following scheme:

  • for the first call to admissionOp.Input.Next we'd use some estimate
  • we'd measure how CPU intensive that call was to get the precise CPU usage
  • on the second and all consecutive calls to admissionOp.Next on the admission system hook we'd use either the actual CPU usage of the previous call or the average across all previous calls.

Similar to how we compute SQL CPU time we'd need to be careful around operators that call into KV but there are only a few of those. We also might need to be careful with dealing the admission system delay on the child admissionOps.

Does this make sense / sound reasonable?

@sumeerbhola
Copy link
Collaborator Author

These admission operators would also have links to their child admission ops (in this example admissionOp (1) has a reference to admissionOp (2)). These links will allow us to calculate the CPU work of the wrapped operator precisely (since the CPU work tracked by admissionOp (1) would include both the sort and the scan, so we need to subtract the CPU work tracked by admissionOp (2)).

How is this deduping done with the current batchInfoCollector wrapper, which presumably includes in its cpu time the work done in transitive operators invoked by bic.Input.Next()? Do we only wrap the root operator of a synchronously running tree of operators? (I am probably using some incorrect terms)

IIUC the main difficulty is the estimation of how CPU intensive particular SQL work will be prior to admitting it. In the KV land we have different parameters (like number of requests in BatchRequest) to use for the estimate

AC does not need any up front estimation of CPU work (we do ask for 100ms of tokens for certain elastic work admission, but that is not how this SQL code will integrate). I imagine something like the following (we can wrap more things in a library so it looks cleaner than below):

func (o *admissionOp) Next() coldata.Batch {
  var handle AdmissionHandle
  workInfo := WorkInfo{TenantID: ..., Priority: ..., CreateTime: ...}
  if accumulatedCPU == 0 || accumulatedCPU > 10*time.Millisecond {
     // First call to Next, or haven't asked permission in a while. NB: this will consume a slot, hence
     // paired with WorkDone below so that the slot is released as soon as Next is done. IO in Next
     // is acceptable since slots (concurrency) do not precisely track GOMAXPROCS. 
     handle, err := o.admission.Admit(o.Ctx, workInfo)
     if err != nil {
     	// Not admitted because context cancelled.
	colexecerror.InternalError(err)
	// TODO: terminate ...
     }
     accumulatedCPU = 0
  }
  b := o.input.Next()
  cpuTime := ...
  accumulatedCPU += cpuTime
  if handle != nil {
     o.admission.WorkDone(handle, cpuTime)
  } else {
    // Did not ask permission, but want AC to know CPU time for fair sharing.
    o.admission.BypassedWorkDone(workInfo, cpuTime)
  }
  return b
}

@DrewKimball
Copy link
Collaborator

How is this deduping done with the current batchInfoCollector wrapper, which presumably includes in its cpu time the work done in transitive operators invoked by bic.Input.Next()?

Each batchInfoCollector measures the total CPU time that is spent during bic.Input.Next, and it uses the total CPU time of its child(ren) to calculate its own CPU time (rather than the "own" time of its child).

@sumeerbhola
Copy link
Collaborator Author

Each batchInfoCollector measures the total CPU time that is spent during bic.Input.Next, and it uses the total CPU time of its child(ren) to calculate its own CPU time (rather than the "own" time of its child).

Thanks. I forgot to mention another constraint. The thing that is calling o.admission.Admit(o.Ctx, workInfo) and getting admitted should then be able to do the work that needs to be done in o.input.Next() without that work blocking in admission control (for cpu, on this node). Otherwise we can have a deadlock in that every operator tree has admitted a few operators but not all, so everyone is blocked for more concurrency slots to open up, but no one will release these slots. IIUC, what is sketched out above violates this constraint in that many operators in the same tree can independently call Admit. Is my understanding correct?
Also, the cpuTime reporting in my code fragment assumed de-duping. Is that going to be expensive if we need to do this for every Batch instead of only in finishAndGetStats?
If we can do the reporting of cpuTime at this fine granularity, we may be ok-ish lifting the Admit call to a different level and doing it more coarsely to satisfy the aforementioned constraint. Say do it before sending a new ProducerMessage (I am guessing if it blocks when sending a ProducerMessage the work for the next ProducerMessage will not happen), or some other after-consuming-the-resource blocking (we will also need to make it work for SQL that is interacting with a remote KV, and not just SQL-SQL interactions). Any ideas?

@DrewKimball
Copy link
Collaborator

Otherwise we can have a deadlock in that every operator tree has admitted a few operators but not all, so everyone is blocked for more concurrency slots to open up, but no one will release these slots. IIUC, what is sketched out above violates this constraint in that many operators in the same tree can independently call Admit. Is my understanding correct?

That's a good point. Maybe then we would have to handle blocking at the root of an operator tree like you said earlier. Alternatively, what if admissionOp had its child admissionOp buffer the next batch before calling Next() on its wrapped input? Something like this:

func (o *admissionOp) Admit() {
  o.inputAdmissionOp.Admit()

  var handle AdmissionHandle
  workInfo := WorkInfo{TenantID: ..., Priority: ..., CreateTime: ...}
  if accumulatedCPU == 0 || accumulatedCPU > 10*time.Millisecond {
     // First call to Next, or haven't asked permission in a while. NB: this will consume a slot, hence
     // paired with WorkDone below so that the slot is released as soon as Next is done. IO in Next
     // is acceptable since slots (concurrency) do not precisely track GOMAXPROCS. 
     handle, err := o.admission.Admit(o.Ctx, workInfo)
     if err != nil {
     	// Not admitted because context cancelled.
	colexecerror.InternalError(err)
	// TODO: terminate ...
     }
     accumulatedCPU = 0
  }
  o.bufferedBatch := o.input.Next()
  cpuTime := ...
  accumulatedCPU += cpuTime
  if handle != nil {
     o.admission.WorkDone(handle, cpuTime)
  } else {
    // Did not ask permission, but want AC to know CPU time for fair sharing.
    o.admission.BypassedWorkDone(workInfo, cpuTime)
  }
}

func (o *admissionOp) Next() coldata.Batch {
  return o.bufferedBatch
}

Then, the root of the tree would call topAdmissionOp.Admit() and then topAdmissionOp.Next() for each batch. This would only work for "pipelined" operators, since operators that perform buffering need to call next more than once. So we'd have to have special handling for buffering operators like sorts - maybe not worth it if a coarser granularity is acceptable. Then again, we might want to do something for buffering operators either way.

Also, the cpuTime reporting in my code fragment assumed de-duping. Is that going to be expensive if we need to do this for every Batch instead of only in finishAndGetStats?

I don't expect this to be expensive at the batch granularity. But as I mentioned above, one common case to consider is an operator that performs buffering like a sort - its input will do all the work it will ever do on the first call to sortOp.Next(), which could be a large amount of time. I'm guessing we'd need to do something about that to get fine enough granularity.

Say do it before sending a new ProducerMessage (I am guessing if it blocks when sending a ProducerMessage the work for the next ProducerMessage will not happen), or some other after-consuming-the-resource blocking

Outbox and BatchFlowCoordinater do seem like good places for this.

we will also need to make it work for SQL that is interacting with a remote KV, and not just SQL-SQL interactions

What sort of special handling would be needed for SQL-KV interactions? Would it be enough to measure the CPU time spent during a call to Next like we've been talking about, or do we need to do something more (e.g. release KV slots)?

@sumeerbhola
Copy link
Collaborator Author

Then, the root of the tree would call topAdmissionOp.Admit() and then topAdmissionOp.Next() for each batch.

Not sure I understand. It seems there are still multiple calls to Admit, starting from the root down, which will all call (in sequence) o.admission.Admit, so if a tree manages to grab 5 slots and then blocks when trying to get the 6th and the same happens for an entirely different tree (because there are only 10 slots) we still have a deadlock (NB: I am oversimplifying the slot stuff a bit -- there isn't really a deadlock because slots will be increased over time in this case, but we want this to work correctly without that assumption, for various reasons).

its input will do all the work it will ever do on the first call to sortOp.Next(), which could be a large amount of time. I'm guessing we'd need to do something about that to get fine enough granularity.

btw, we don't have to be perfect. If expensive sorts that consume 100+ms on a single sortOp.Next() are uncommon we can incur the risk of a coarse granularity Admit/WorkDone pair. Performance isolation is typically a moving target. We just want to have an integration structure in place that is mostly sound and can be extended in the future.

What sort of special handling would be needed for SQL-KV interactions? Would it be enough to measure the CPU time spent during a call to Next like we've been talking about, or do we need to do something more (e.g. release KV slots)?

Regarding whether we need to do the Admit, WorkDone sequence for these interactions depends on how much CPU gets consumed in the SQL layer. My understanding is that lookup joins are one case where this interaction will happen, where there is a local input stream that is providing rows that are then being used to do (potentially) remote lookups (in a batched manner). Whenever a response arrives for such a batch, the joiner would do some work, and potentially provide a Batch to upstream operators. We need some interception point (even if coarse) to sometimes ask for admission in this setup.

@DrewKimball
Copy link
Collaborator

DrewKimball commented Feb 22, 2023

Not sure I understand. It seems there are still multiple calls to Admit, starting from the root down, which will all call (in sequence) o.admission.Admit, so if a tree manages to grab 5 slots and then blocks when trying to get the 6th and the same happens for an entirely different tree (because there are only 10 slots) we still have a deadlock

The idea is that there could be multiple calls to o.admission.Admit() in the tree, but each child admissionOp will always report the work done by its input (thus freeing up slots) before the parent makes its own call. Also, in your example each operator that used a slot would compute and calculate a batch, ensuring that progress is made for each slot that gets used. Does that make sense?

Whenever a response arrives for such a batch, the joiner would do some work, and potentially provide a Batch to upstream operators. We need some interception point (even if coarse) to sometimes ask for admission in this setup.

I think I'm missing something here. For the lookup-join case (for example), would it not be enough to handle the call to LookupJoin.Next() in the way we've been talking about? Or do you mean that we have to be careful not to include time in the KV layer when reporting CPU spent by a SQL operator that interfaces with it?

@sumeerbhola
Copy link
Collaborator Author

The idea is that there could be multiple calls to o.admission.Admit() in the tree, but each child admissionOp will always report the work done by its input (thus freeing up slots) before the parent makes its own call.

I see. I think that makes sense.

I think I'm missing something here. For the lookup-join case (for example), would it not be enough to handle the call to LookupJoin.Next() in the way we've been talking about?

Possibly. I assumed your statement "only work for "pipelined" operators" means this option was not fully on the table.

Or do you mean that we have to be careful not to include time in the KV layer when reporting CPU spent by a SQL operator that interfaces with it?

Hmm, I didn't think of that, but that's a good point. If we are using the same goroutine to go through to KV via internalClientAdapter, we would not want to count the time spent in the KV layer.

I'll mock up some interfaces for SQL to use, in line with what we discussed above, and you could try using them, and then we can iteratively refine. Sound good?

@DrewKimball
Copy link
Collaborator

Possibly. I assumed your statement "only work for "pipelined" operators" means this option was not fully on the table.

Lookup joins could be a problem for the mechanism I proposed, but only because they potentially buffer input rows similar to sorts. The problem is when there isn't a one-to-one mapping between calling Next on an operator and it calling Next on its input, since the Admit method I described only buffers one batch.

Hmm, I didn't think of that, but that's a good point. If we are using the same goroutine to go through to KV via internalClientAdapter, we would not want to count the time spent in the KV layer.

The current CPU measuring logic actually already handles this, though it misses some SQL work. We'll probably want to make it more precise going forward.

I'll mock up some interfaces for SQL to use, in line with what we discussed above, and you could try using them, and then we can iteratively refine. Sound good?

SGTM

sumeerbhola added a commit to sumeerbhola/cockroach that referenced this issue Feb 23, 2023
Informs cockroachdb#85471

Epic: none

Release note: None
sumeerbhola added a commit to sumeerbhola/cockroach that referenced this issue Feb 23, 2023
Informs cockroachdb#85471

Epic: none

Release note: None
@exalate-issue-sync exalate-issue-sync bot added the T-kv KV Team label Mar 15, 2023
@blathers-crl blathers-crl bot added this to Incoming in KV Mar 15, 2023
@exalate-issue-sync exalate-issue-sync bot added T-admission-control Admission Control and removed T-kv KV Team labels Nov 14, 2023
@yuzefovich yuzefovich added the O-support Originated from a customer label Apr 22, 2024
@sumeerbhola sumeerbhola added the P-3 Issues/test failures with no fix SLA label Apr 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-admission-control C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) O-support Originated from a customer P-3 Issues/test failures with no fix SLA T-admission-control Admission Control
Projects
KV
Incoming
Development

No branches or pull requests

3 participants