Skip to content

Commit

Permalink
distsql: fix cleaning up resources in an error case in setupFlow
Browse files Browse the repository at this point in the history
Release note: None

Release justification: low-risk improvement to resources' cleanup in an
edge case.
  • Loading branch information
yuzefovich committed Aug 27, 2021
1 parent ea4b23f commit cf8acbf
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions pkg/sql/distsql/server.go
Expand Up @@ -214,7 +214,7 @@ func (ds *ServerImpl) setupFlow(
rowSyncFlowConsumer execinfra.RowReceiver,
batchSyncFlowConsumer execinfra.BatchReceiver,
localState LocalState,
) (context.Context, flowinfra.Flow, execinfra.OpChains, error) {
) (retCtx context.Context, _ flowinfra.Flow, _ execinfra.OpChains, retErr error) {
if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) {
err := errors.Errorf(
"version mismatch in flow request: %d; this node accepts %d through %d",
Expand All @@ -224,8 +224,27 @@ func (ds *ServerImpl) setupFlow(
return ctx, nil, nil, err
}

var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanup func()
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
if retErr != nil {
if sp != nil {
sp.Finish()
}
if monitor != nil {
monitor.Stop(ctx)
}
if onFlowCleanup != nil {
onFlowCleanup()
}
retCtx = tracing.ContextWithSpan(ctx, nil)
}
}()

const opName = "flow"
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
if parentSpan == nil {
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName)
} else if localState.IsLocal {
Expand All @@ -245,8 +264,7 @@ func (ds *ServerImpl) setupFlow(
)
}

// The monitor opened here is closed in Flow.Cleanup().
monitor := mon.NewMonitor(
monitor = mon.NewMonitor(
"flow",
mon.MemoryResource,
ds.Metrics.CurBytesCount,
Expand Down Expand Up @@ -275,7 +293,6 @@ func (ds *ServerImpl) setupFlow(

var evalCtx *tree.EvalContext
var leafTxn *kv.Txn
var onFlowCleanup func()
if localState.EvalContext != nil {
evalCtx = localState.EvalContext
// We're about to mutate the evalCtx and we want to restore its original
Expand All @@ -302,7 +319,6 @@ func (ds *ServerImpl) setupFlow(

sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData)
if err != nil {
sp.Finish()
return ctx, nil, nil, err
}
ie := &lazyInternalExecutor{
Expand Down Expand Up @@ -375,14 +391,6 @@ func (ds *ServerImpl) setupFlow(
ctx, opChains, err = f.Setup(ctx, &req.Flow, opt)
if err != nil {
log.Errorf(ctx, "error setting up flow: %s", err)
// Flow.Cleanup will not be called, so we have to close the memory monitor
// and finish the span manually.
monitor.Stop(ctx)
sp.Finish()
if onFlowCleanup != nil {
onFlowCleanup()
}
ctx = tracing.ContextWithSpan(ctx, nil)
return ctx, nil, nil, err
}
if !f.IsLocal() {
Expand All @@ -404,7 +412,6 @@ func (ds *ServerImpl) setupFlow(
} else {
// If I haven't created the leaf already, do it now.
if leafTxn == nil {
var err error
leafTxn, err = makeLeaf(req)
if err != nil {
return nil, nil, nil, err
Expand Down

0 comments on commit cf8acbf

Please sign in to comment.