Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: add streaming metadata propagation #65586

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

yuzefovich
Copy link
Member

@yuzefovich yuzefovich commented May 22, 2021

colmeta: introduce new package to support streaming meta propagation

This commit introduces a couple of interfaces into colexecop package
(currently not used) that will be implemented by the root components of
vectorized flows (both flow coordinators and outboxes) in order to
propagate the metadata in a streaming fashion.

It also introduces a new package colmeta that contains a utility
component implementing the logic of intertwining pieces of data (like
coldata.Batches and rowenc.EncDatumRows) with the requests to
propagate metadata in a streaming fashion.

The utility handler is designed to be used as follows:

  • there is a separate goroutine (DataProducer) reading from the input
    to the root component and pushing pieces of data onto a channel in
    synchronous manner. This goroutine is blocked in order to not request
    more data from the input until necessary.
  • there is an arbitrary number of goroutines
    (StreamingMetadataProducers) that want to propagate the metadata in
    a streaming fashion.
  • there is a main goroutine (DataConsumer) of the root component
    responsible for pushing the data intertwined with streaming meta to the
    output of the root component.

Release note: None

colflow: utilize streaming metadata handler in root components

This commit refactors flow coordinators and outboxes to use colmeta
package in order to support propagating the metadata in a streaming
fashion. This required introduction of another goroutine in all root
components (so now the outbox needs 2 goroutines in addition to the one
in which it is running).

Notably, a refactor of the row flow coordinator was needed to no longer
rely on execinfra.ProcessorBase utilities because that struct assumes
that all methods are called from a single goroutine, and this commit was
breaking that assumption. As a result, flowCoordinatorBase was
extracted to contain the common logic of both row and batch flow
coordinators (making the former no longer use the ProcessorBase).

Currently, only ColBatchScans and Inboxes propagate some metadata in
a streaming fashion (the former sends scan progress metadata, so we now
have the same query progress reporting in the vectorized engine as we do
in the row engine).

Fixes: #55758.

Release note (sql change): Queries executed via the vectorized engine
now display their progress in phase column of SHOW QUERIES.
Previously, this feature was only available in the row-by-row engine.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the streaming-meta branch 5 times, most recently from 6063d65 to de9afd2 Compare May 28, 2021 17:59
@yuzefovich yuzefovich force-pushed the streaming-meta branch 22 times, most recently from 9f8a225 to fcdeadb Compare June 6, 2021 00:07
@yuzefovich yuzefovich force-pushed the streaming-meta branch 2 times, most recently from db910f7 to 2121ade Compare June 10, 2021 16:37
@yuzefovich yuzefovich force-pushed the streaming-meta branch 2 times, most recently from 269f298 to 9145844 Compare June 10, 2021 18:24
@yuzefovich yuzefovich marked this pull request as ready for review June 10, 2021 18:25
@yuzefovich yuzefovich requested review from michae2, jordanlewis and a team June 10, 2021 18:25
@yuzefovich
Copy link
Member Author

I think this is RFAL.

I'm also very open to suggestions on the naming here (of the new package, interfaces, handler, etc), so please let me know if you have any ideas.

@yuzefovich yuzefovich force-pushed the streaming-meta branch 2 times, most recently from 0be1fa9 to 8924ae6 Compare June 14, 2021 18:24
Copy link
Member

@jordanlewis jordanlewis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look, nice work on this.

I am concerned about the complexity of the careful dance of locks and channels that must be performed. Is there some way that this code can be simplified? We know that we've seen some trouble here before and I'm not looking forward to the day that we have to debug a stuck goroutine on one of the channels...

From a high level, instead of adding a 3rd goroutine that has to be carefully synchronized with producerBlock, is there some way that we could conceivably have the main goroutine alternate from Batch/row to incoming streamed metadata itself? Or is there some issue with that which might cause a deadlock or something?

I'm also wondering, is there a performance cost to this change? We'll now have to do several(?) more channel sends per row (in the row engine) and per batch (in vectorized). Isn't it correct that every row/batch causes a synchronization on the producerBlock channel?

I'll take another look soon. It takes some time to wrap my head around the logic.

Reviewed 7 of 7 files at r1, 1 of 1 files at r2, 5 of 5 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jordanlewis, @michae2, and @yuzefovich)


pkg/sql/colfetcher/colbatch_scan.go, line 133 at r4 (raw file):

		meta.Metrics = execinfrapb.GetMetricsMeta()
		meta.Metrics.RowsRead = sinceLastUpdate
		if err := s.streamingMetaReceiver.PushStreamingMeta(s.Ctx, meta); err != nil {

What if the error was a context cancellation or another event that we would rather intercept and shut down because of? Is it safe to swallow this error?


pkg/sql/colflow/flow_coordinator.go, line 150 at r4 (raw file):

	// communicating all the data to the consumer goroutine (the current one).
	// TODO(yuzefovich): consider using stopper to run this goroutine.
	go func(flowCtx context.Context) {

Why are we not using the stopper here?


pkg/sql/colflow/flow_coordinator.go, line 154 at r4 (raw file):

		defer f.producer.ProducerDone()
		if err := f.producer.WaitForConsumer(flowCtx); err != nil {
			return

Probably we should log this error, rather than swallow it completely.


pkg/sql/colflow/flow_coordinator.go, line 317 at r4 (raw file):

		meta.Err = err
		exit = f.producer.SendMeta(ctx, meta) != nil
		return

nit: using naked returns and named return values is generally considered an antipattern for readability, I would prefer to always see return drain, exit or just return false, f.producer.SendMeta(ctx, meta) != nil


pkg/sql/colflow/colmeta/streaming_meta.go, line 23 at r3 (raw file):

)

// The outline of how the interfaces in this file are designed to be used.

Nice documentation!

I think it would be helpful to include one more section before the diagram: what is the high level summary of the colmeta package?


pkg/sql/colflow/colmeta/streaming_meta.go, line 48 at r4 (raw file):

	// the consumer arrives. A context cancellation error can be returned in
	// which case the producer should exit right away.
	WaitForConsumer(context.Context) error

I'm not sure this is idiomatic - it seems like a lot of interfaces prefer to give you a channel to wait on, like ctx.Done() does.


pkg/sql/execinfra/scanbase.go, line 22 at r4 (raw file):

// ScanProgressFrequency determines how often the scan operators should emit
// the metadata about how many rows they have read.

Is this in rows? Maybe add a comment?

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the complexity of the careful dance of locks and channels that must be performed. Is there some way that this code can be simplified?

I share your concern, but I couldn't think of anything simpler... I'm keeping my fingers crossed so that

the day that we have to debug a stuck goroutine on one of the channels...

never comes because we're careful to always listen on the context when performing sends/recv's on the channels. There are a couple of exceptions to this strategy:

  • we don't listen for context cancellation from the DataProducer goroutine when sending on producerBlock channel because that channel is buffered, and we won't try to send twice or more times before that channel is received from
  • we don't listen for context cancellation from the DataProducer goroutine when recv'ing from nextCh channel because we rely on the DataConsumer goroutine to notice the context cancellation and close that channel properly.

From a high level, instead of adding a 3rd goroutine that has to be carefully synchronized with producerBlock, is there some way that we could conceivably have the main goroutine alternate from Batch/row to incoming streamed metadata itself? Or is there some issue with that which might cause a deadlock or something?

What exactly do you have in mind?

My original idea from a year ago was to have the root component periodically non-blockingly poll something (like separate channel for streaming metadata) every time before the component pushes row/batch to its output. It would be something like

func (f *BatchFlowCoordinator) Run() {
  for {
    nextBatch := input.Next()
    // Push all streaming meta we have accumulated so far.
  LOOP:
    for {
      select {
      case meta <- f.streamingMeta:
        f.output.PushBatch(nil, meta)
      default:
        break LOOP
      }
    }
    f.output.Pushbatch(nextBatch, nil)
  }
}

However, I realized that such a strategy won't work in case the flow doesn't produce any data (e.g. automatic stats collection doesn't push anything to the flow coordinator AFAIU), so I abandoned this idea.

I'm also wondering, is there a performance cost to this change? We'll now have to do several(?) more channel sends per row (in the row engine) and per batch (in vectorized).

Yeah, I expect this change to have some performance cost, and I haven't run any benchmarks (but will kick them off tomorrow). Note that the row-by-row engine is not affected by this change at all, but in the vectorized engine - yes, we have an extra channel send per row (if a processor is at the root) or per batch (if an operator is at the root).

Isn't it correct that every row/batch causes a synchronization on the producerBlock channel?

Yes, that's correct, but in my mind it is the desired behavior given that we don't want to allow for the DataProducer goroutine to proceed processing more data, in the general case, until the DataConsumer tells it to.

However, now that I'm thinking about this, this might be a poor choice. The original case I was concerned about was if the query had a limit, I didn't want for the DataProducer to produce more than necessary, but the tree of operators is aware of the limit itself (unlike the DistSQLReceiver), so now I think it makes sense to let the DataProducer run eagerly. What do you think?

We will need to add some allocations (of nextChMsgs), but we can sync.Pool them since we have very clear lifetimes.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jordanlewis and @michae2)


pkg/sql/colfetcher/colbatch_scan.go, line 133 at r4 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

What if the error was a context cancellation or another event that we would rather intercept and shut down because of? Is it safe to swallow this error?

In this case it is safe to swallow the error, I added the comment. We could inspect the error here, and if it is a context cancellation, then panic, but I don't think it's worth it. Let me know if you think otherwise.


pkg/sql/colflow/flow_coordinator.go, line 150 at r4 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Why are we not using the stopper here?

I think just because of an oversight. We have a couple of places where we are spinning up new goroutines (in the parallel unordered sync, the hash router, the outbox, and now the flow coordinators), and I want to make them use the stopper thingy all at the same time since there might be some plumbing to do, and I don't want to de-rail this PR.


pkg/sql/colflow/flow_coordinator.go, line 154 at r4 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Probably we should log this error, rather than swallow it completely.

Done.


pkg/sql/colflow/colmeta/streaming_meta.go, line 23 at r3 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Nice documentation!

I think it would be helpful to include one more section before the diagram: what is the high level summary of the colmeta package?

Done.


pkg/sql/colflow/colmeta/streaming_meta.go, line 48 at r4 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

I'm not sure this is idiomatic - it seems like a lot of interfaces prefer to give you a channel to wait on, like ctx.Done() does.

Refactored (in a temporary WIP commit that I'll split it up and squash later - just want to make sure that my understanding is sound and nothing breaks).


pkg/sql/execinfra/scanbase.go, line 22 at r4 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Is this in rows? Maybe add a comment?

Done.

@yuzefovich
Copy link
Member Author

The performance numbers of KV95 are depressing:

  • GCE:
tail -n 1 kv-old-5m.log
  300.0s        0        2949911         9833.0      0.8      0.8      1.7      2.6   4563.4  
tail -n 1 kv-new-5m.log
  300.0s        0        2754811         9182.7      0.9      0.9      1.8      2.6     58.7  
tail -n 1 kv-old-10m.log
  600.0s        0        6284128        10473.5      0.8      0.7      1.6      2.4     58.7  
tail -n 1 kv-new-10m.log
  600.0s        0        5567691         9279.5      0.9      0.9      1.8      2.5     48.2  
tail -n 1 kv-old-15m.log
  900.0s        0        9805583        10895.1      0.7      0.7      1.5      2.2     22.0  
tail -n 1 kv-new-15m.log
  900.0s        0        8767418         9741.6      0.8      0.8      1.6      2.4    184.5  
  • AWS:
tail -n 1 kv-old-5m.log
  300.0s        0        3675694        12252.3      0.7      0.7      1.2      1.9    209.7  
tail -n 1 kv-new-5m.log
  300.0s        0        3317439        11058.1      0.7      0.7      1.3      1.8    209.7  
tail -n 1 kv-old-10m.log
  600.0s        0        7554855        12591.4      0.6      0.7      1.2      1.6    209.7  
tail -n 1 kv-new-10m.log
  600.0s        0        6739271        11232.1      0.7      0.7      1.2      1.8    352.3  
tail -n 1 kv-old-15m.log
  900.0s        0       11399183        12665.8      0.6      0.7      1.2      1.6    209.7  
tail -n 1 kv-new-15m.log
  900.0s        0       10120405        11244.9      0.7      0.7      1.2      1.7    335.5  

I guess the next step is trying removing that extra send/recv on each row/batch.

This simplifies the signature of the method a bit.

Release note: None
This commit clarifies the usage of different contexts in a test since we
use different contexts to simulate remote nodes as well as independent
scenarios.

Release note: None
@yuzefovich
Copy link
Member Author

I rebased on top of master and squashed the last WIP commit that modified the interface a bit. Looking into the refactoring to reduce the performance hit now.

@yuzefovich
Copy link
Member Author

Ouch.

Isn't it correct that every row/batch causes a synchronization on the producerBlock channel?

Yes, that's correct, but in my mind it is the desired behavior given that we don't want to allow for the DataProducer goroutine to proceed processing more data, in the general case, until the DataConsumer tells it to.

However, now that I'm thinking about this, this might be a poor choice. The original case I was concerned about was if the query had a limit, I didn't want for the DataProducer to produce more than necessary, but the tree of operators is aware of the limit itself (unlike the DistSQLReceiver), so now I think it makes sense to let the DataProducer run eagerly. What do you think?

There is another reason for why we have to block DataProducer goroutine until DataConsumer goroutine has received the data and communicated it to the output - calls to Next might invalidate the results (meaning that allowing the producer to run eagerly might corrupt the data that has been sent to the consumer but haven't yet been communicated to the output). And I don't know what to do here other than performing a deep copy :/ I'll try this approach, but I'm worried it'll be even worse.

Am I missing something? Does anyone have other ideas?

This commit introduces a couple of interfaces into `colexecop` package
(currently not used) that will be implemented by the root components of
vectorized flows (both flow coordinators and outboxes) in order to
propagate the metadata in a streaming fashion.

It also introduces a new package `colmeta` that contains a utility
component implementing the logic of intertwining pieces of data (like
coldata.Batches and rowenc.EncDatumRows) with the requests to
propagate metadata in a streaming fashion.

The utility handler is designed to be used as follows:
- there is a separate goroutine (`DataProducer`) reading from the input
to the root component and pushing pieces of data onto a channel in
synchronous manner. This goroutine is blocked in order to not request
more data from the input until necessary.
- there is an arbitrary number of goroutines
(`StreamingMetadataProducer`s) that want to propagate the metadata in
a streaming fashion.
- there is a main goroutine (`DataConsumer`) of the root component
responsible for pushing the data intertwined with streaming meta to the
output of the root component.

Release note: None
This commit refactors flow coordinators and outboxes to use `colmeta`
package in order to support propagating the metadata in a streaming
fashion. This required introduction of another goroutine in all root
components (so now the outbox needs 2 goroutines in addition to the one
in which it is running).

Notably, a refactor of the row flow coordinator was needed to no longer
rely on `execinfra.ProcessorBase` utilities because that struct assumes
that all methods are called from a single goroutine, and this commit was
breaking that assumption. As a result, `flowCoordinatorBase` was
extracted to contain the common logic of both row and batch flow
coordinators (making the former no longer use the ProcessorBase).

Currently, only `ColBatchScan`s and `Inbox`es propagate some metadata in
a streaming fashion (the former sends scan progress metadata, so we now
have the same query progress reporting in the vectorized engine as we do
in the row engine).

Release note (sql change): Queries executed via the vectorized engine
now display their progress in `phase` column of `SHOW QUERIES`.
Previously, this feature was only available in the row-by-row engine.
@yuzefovich yuzefovich added the do-not-merge bors won't merge a PR with this label. label Jul 15, 2021
@yuzefovich
Copy link
Member Author

We are putting this work on the shelf for the time being since I'll be focusing on a higher priority item instead.

The next step for this PR is to fix and polish up the last WIP commit which removes the extra synchronization on producerBlock channel by performing a deep copy of the data and then to measure the performance impact of the PR as a whole.

However, I'm quite pessimistic at this point about pursuing this PR further (I'm guessing that the approach with extra copies will also incur the performance hit on the order of 10% on KV like workloads which is unacceptable), and I'm thinking that we might have to change Operator.Next signature to also return the metadata, always in a streaming fashion. Such solution will make the Operator interface similar to RowSource one, should be relatively simple to implement (just with a lot of manual labor), and might actually allow us to simplify some things (namely DrainMeta implementations).

craig bot pushed a commit that referenced this pull request Jul 20, 2021
67663: colrpc: minor cleanup r=yuzefovich a=yuzefovich

This PR extracts a couple of commits from #65586 which are not
controversial and seem beneficial in its own right.

**colrpc: take in OpWithMetaInfo in the outbox constructor**

This simplifies the signature of the method a bit.

Release note: None

**colrpc: clarify a test a bit**

This commit clarifies the usage of different contexts in a test since we
use different contexts to simulate remote nodes as well as independent
scenarios.

Release note: None

67686: changefeedccl: retry webhook sink requests upon HTTP error r=spiffyyeng a=spiffyyeng

Before, webhook sink requests simply resulted in retryable changefeed
errors upon failure. This change adds default retry behavior for HTTP
requests, preventing the need for changefeeds to shut down and restart
every time.

Resolves #67312

Release note: None

67735: sqlproxy: fix test flake r=andy-kimball a=andy-kimball

TestDirectoryConnect is flaking when testing that it has only one connection
to the proxy. This flake happens because it takes time for connection
closure to propagate to the server. The connection from the previous
sub-test is still present. The fix is to use require.Eventually to wait
until the previous connection is torn down.

Fixes #67405

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Ryan Min <ryanmin42@gmail.com>
Co-authored-by: Andrew Kimball <andyk@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
do-not-merge bors won't merge a PR with this label.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

colexec: add a way to propagate metadata in a streaming fashion
3 participants