Skip to content

Commit

Permalink
Merge #24477
Browse files Browse the repository at this point in the history
24477: distsqlrun: add NoopProcessor benchmark r=jordanlewis a=jordanlewis

It's nice to be able to see how fast the DistSQL processor interface itself is.

```
name             time/op
Noop/cols=1-8     3.23ms ± 2%
Noop/cols=2-8     4.56ms ± 2%
Noop/cols=4-8     6.53ms ± 5%
Noop/cols=16-8    23.4ms ± 2%
Noop/cols=256-8    407ms ± 6%

name             speed
Noop/cols=1-8    162MB/s ± 2%
Noop/cols=2-8    230MB/s ± 2%
Noop/cols=4-8    321MB/s ± 5%
Noop/cols=16-8   359MB/s ± 2%
Noop/cols=256-8  331MB/s ± 5%
```
  • Loading branch information
craig[bot] committed Apr 4, 2018
2 parents 38dbf4d + 5a2cc78 commit b48bef3
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 103 deletions.
124 changes: 124 additions & 0 deletions pkg/sql/distsqlrun/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package distsqlrun

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

// noopProcessor is a processor that simply passes rows through from the
// synchronizer to the post-processing stage. It can be useful for its
// post-processing or in the last stage of a computation, where we may only
// need the synchronizer to join streams.
type noopProcessor struct {
processorBase
input RowSource
}

var _ Processor = &noopProcessor{}
var _ RowSource = &noopProcessor{}

func newNoopProcessor(
flowCtx *FlowCtx, input RowSource, post *PostProcessSpec, output RowReceiver,
) (*noopProcessor, error) {
n := &noopProcessor{input: input}
if err := n.init(post, input.OutputTypes(), flowCtx, nil /* evalCtx */, output); err != nil {
return nil, err
}
return n, nil
}

// Run is part of the processor interface.
func (n *noopProcessor) Run(wg *sync.WaitGroup) {
if n.out.output == nil {
panic("noopProcessor output not initialized for emitting rows")
}
Run(n.flowCtx.Ctx, n, n.out.output)
if wg != nil {
wg.Done()
}
}

func (n *noopProcessor) close() {
if n.internalClose() {
n.input.ConsumerClosed()
}
}

// producerMeta constructs the ProducerMetadata after consumption of rows has
// terminated, either due to being indicated by the consumer, or because the
// processor ran out of rows or encountered an error. It is ok for err to be
// nil indicating that we're done producing rows even though no error occurred.
func (n *noopProcessor) producerMeta(err error) *ProducerMetadata {
var meta *ProducerMetadata
if !n.closed {
if err != nil {
meta = &ProducerMetadata{Err: err}
} else if trace := getTraceData(n.ctx); trace != nil {
meta = &ProducerMetadata{TraceData: trace}
}
// We need to close as soon as we send producer metadata as we're done
// sending rows. The consumer is allowed to not call ConsumerDone().
n.close()
}
return meta
}

// Next is part of the RowSource interface.
func (n *noopProcessor) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
n.maybeStart("noop", "" /* logTag */)

if n.closed {
return nil, n.producerMeta(nil /* err */)
}

for {
row, meta := n.input.Next()
if meta != nil {
return nil, meta
}
if row == nil {
return nil, n.producerMeta(nil /* err */)
}

outRow, status, err := n.out.ProcessRow(n.ctx, row)
if err != nil {
return nil, n.producerMeta(err)
}
switch status {
case NeedMoreRows:
if outRow == nil && err == nil {
continue
}
case DrainRequested:
n.input.ConsumerDone()
continue
}
return outRow, nil
}
}

// ConsumerDone is part of the RowSource interface.
func (n *noopProcessor) ConsumerDone() {
n.input.ConsumerDone()
}

// ConsumerClosed is part of the RowSource interface.
func (n *noopProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
n.close()
}
62 changes: 62 additions & 0 deletions pkg/sql/distsqlrun/noop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package distsqlrun

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

func BenchmarkNoop(b *testing.B) {
const numRows = 1 << 16

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)

flowCtx := &FlowCtx{
Ctx: ctx,
Settings: st,
EvalCtx: evalCtx,
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}
for _, numCols := range []int{1, 1 << 1, 1 << 2, 1 << 4, 1 << 8} {
b.Run(fmt.Sprintf("cols=%d", numCols), func(b *testing.B) {
cols := make([]sqlbase.ColumnType, numCols)
for i := range cols {
cols[i] = intType
}
input := NewRepeatableRowSource(cols, makeIntRows(numRows, numCols))

b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newNoopProcessor(flowCtx, input, post, disposer)
if err != nil {
b.Fatal(err)
}
d.Run(nil)
input.Reset()
}
})
}
}
103 changes: 0 additions & 103 deletions pkg/sql/distsqlrun/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,28 +463,6 @@ func (rb *rowSourceBase) consumerClosed(name string) {
atomic.StoreUint32((*uint32)(&rb.consumerStatus), uint32(ConsumerClosed))
}

// noopProcessor is a processor that simply passes rows through from the
// synchronizer to the post-processing stage. It can be useful for its
// post-processing or in the last stage of a computation, where we may only
// need the synchronizer to join streams.
type noopProcessor struct {
processorBase
input RowSource
}

var _ Processor = &noopProcessor{}
var _ RowSource = &noopProcessor{}

func newNoopProcessor(
flowCtx *FlowCtx, input RowSource, post *PostProcessSpec, output RowReceiver,
) (*noopProcessor, error) {
n := &noopProcessor{input: input}
if err := n.init(post, input.OutputTypes(), flowCtx, nil /* evalCtx */, output); err != nil {
return nil, err
}
return n, nil
}

// processorSpan creates a child span for a processor (if we are doing any
// tracing). The returned span needs to be finished using tracing.FinishSpan.
func processorSpan(ctx context.Context, name string) (context.Context, opentracing.Span) {
Expand All @@ -496,87 +474,6 @@ func processorSpan(ctx context.Context, name string) (context.Context, opentraci
return opentracing.ContextWithSpan(ctx, newSpan), newSpan
}

// Run is part of the processor interface.
func (n *noopProcessor) Run(wg *sync.WaitGroup) {
if n.out.output == nil {
panic("noopProcessor output not initialized for emitting rows")
}
Run(n.flowCtx.Ctx, n, n.out.output)
if wg != nil {
wg.Done()
}
}

func (n *noopProcessor) close() {
if n.internalClose() {
n.input.ConsumerClosed()
}
}

// producerMeta constructs the ProducerMetadata after consumption of rows has
// terminated, either due to being indicated by the consumer, or because the
// processor ran out of rows or encountered an error. It is ok for err to be
// nil indicating that we're done producing rows even though no error occurred.
func (n *noopProcessor) producerMeta(err error) *ProducerMetadata {
var meta *ProducerMetadata
if !n.closed {
if err != nil {
meta = &ProducerMetadata{Err: err}
} else if trace := getTraceData(n.ctx); trace != nil {
meta = &ProducerMetadata{TraceData: trace}
}
// We need to close as soon as we send producer metadata as we're done
// sending rows. The consumer is allowed to not call ConsumerDone().
n.close()
}
return meta
}

// Next is part of the RowSource interface.
func (n *noopProcessor) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
n.maybeStart("noop", "" /* logTag */)

if n.closed {
return nil, n.producerMeta(nil /* err */)
}

for {
row, meta := n.input.Next()
if meta != nil {
return nil, meta
}
if row == nil {
return nil, n.producerMeta(nil /* err */)
}

outRow, status, err := n.out.ProcessRow(n.ctx, row)
if err != nil {
return nil, n.producerMeta(err)
}
switch status {
case NeedMoreRows:
if outRow == nil && err == nil {
continue
}
case DrainRequested:
n.input.ConsumerDone()
continue
}
return outRow, nil
}
}

// ConsumerDone is part of the RowSource interface.
func (n *noopProcessor) ConsumerDone() {
n.input.ConsumerDone()
}

// ConsumerClosed is part of the RowSource interface.
func (n *noopProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
n.close()
}

func newProcessor(
flowCtx *FlowCtx,
core *ProcessorCoreUnion,
Expand Down

0 comments on commit b48bef3

Please sign in to comment.