Skip to content

Commit

Permalink
distsql: run last processor from main goroutine
Browse files Browse the repository at this point in the history
Closes #27734

This change improves distsql throughput for short-running queries by
removing some synchronization overhead. On a kv --read-percent=100
workload, this change resulted in a 25% throughput improvement.

Release note (performance improvement): Improve fixed cost of running
distributed sql queries.
  • Loading branch information
asubiotto committed Jul 25, 2018
1 parent 401ac08 commit df62b9b
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 20 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ func (dsp *DistSQLPlanner) Run(
recv.SetError(err)
return
}

// TODO(radu): this should go through the flow scheduler.
if err := flow.Start(ctx, func() {}); err != nil {
if err := flow.StartSync(ctx, func() {}); err != nil {
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
"The error should have gone to the consumer.", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func runTestFlow(
if err != nil {
t.Fatal(err)
}
if err := flow.Start(ctx, func() {}); err != nil {
if err := flow.StartAsync(ctx, func() {}); err != nil {
t.Fatal(err)
}
flow.Wait()
Expand Down
63 changes: 48 additions & 15 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,10 @@ func (f *Flow) setup(ctx context.Context, spec *FlowSpec) error {
return nil
}

// Start starts the flow (each processor runs in their own goroutine).
//
// Generally if errors are encountered during the setup part, they're returned.
// But if the flow is a synchronous one, then no error is returned; instead the
// setup error is pushed to the syncFlowConsumer. In this case, a subsequent
// call to f.Wait() will not block.
func (f *Flow) Start(ctx context.Context, doneFn func()) error {
// startInternal starts the flow. All processors apart from the last one are
// started, each in their own goroutine. The caller must forward any returned
// error to syncFlowConsumer if set.
func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
f.doneFn = doneFn
log.VEventf(
ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
Expand All @@ -481,12 +478,6 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) error {
if err := f.flowRegistry.RegisterFlow(
ctx, f.id, f, f.inboundStreams, settingFlowStreamTimeout.Get(&f.FlowCtx.Settings.SV),
); err != nil {
if f.syncFlowConsumer != nil {
// For sync flows, the error goes to the consumer.
f.syncFlowConsumer.Push(nil /* row */, &ProducerMetadata{Err: err})
f.syncFlowConsumer.ProducerDone()
return nil
}
return err
}

Expand All @@ -498,9 +489,51 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) error {
for _, s := range f.startables {
s.start(ctx, &f.waitGroup, f.ctxCancel)
}
for _, p := range f.processors {
for i := 0; i < len(f.processors)-1; i++ {
f.waitGroup.Add(1)
go f.processors[i].Run(ctx, &f.waitGroup)
}
return nil
}

// StartAsync starts the flow. Each processor runs in their own goroutine.
//
// Generally if errors are encountered during the setup part, they're returned.
// But if the flow is a synchronous one, then no error is returned; instead the
// setup error is pushed to the syncFlowConsumer. In this case, a subsequent
// call to f.Wait() will not block.
func (f *Flow) StartAsync(ctx context.Context, doneFn func()) error {
if err := f.startInternal(ctx, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &ProducerMetadata{Err: err})
f.syncFlowConsumer.ProducerDone()
return nil
}
return err
}
if len(f.processors) > 0 {
f.waitGroup.Add(1)
go p.Run(ctx, &f.waitGroup)
go f.processors[len(f.processors)-1].Run(ctx, &f.waitGroup)
}
return nil
}

// StartSync starts the flow just like StartAsync but the last processor is run
// from the main goroutine. Wait() must still be called afterwards as other
// goroutines might be spawned.
func (f *Flow) StartSync(ctx context.Context, doneFn func()) error {
if err := f.startInternal(ctx, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &ProducerMetadata{Err: err})
f.syncFlowConsumer.ProducerDone()
return nil
}
return err
}
if len(f.processors) > 0 {
f.processors[len(f.processors)-1].Run(ctx, nil)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestSyncFlowAfterDrain(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := flow.Start(ctx, func() {}); err != nil {
if err := flow.StartAsync(ctx, func() {}); err != nil {
t.Fatal(err)
}
flow.Wait()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error {
)
fs.mu.numRunning++
fs.metrics.FlowStart()
if err := f.Start(ctx, func() { fs.flowDoneCh <- f }); err != nil {
if err := f.StartAsync(ctx, func() { fs.flowDoneCh <- f }); err != nil {
return err
}
// TODO(radu): we could replace the WaitGroup with a structure that keeps a
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (ds *ServerImpl) RunSyncFlow(stream DistSQL_RunSyncFlowServer) error {
defer ctxCancel()
mbox.start(ctx, &f.waitGroup, ctxCancel)
ds.Metrics.FlowStart()
if err := f.Start(ctx, func() {}); err != nil {
if err := f.StartSync(ctx, func() {}); err != nil {
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
"The error should have gone to the consumer.", err)
}
Expand Down

0 comments on commit df62b9b

Please sign in to comment.