diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 9b8dfe758eef..d987f55d9e14 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -214,7 +214,7 @@ func (ds *ServerImpl) setupFlow( rowSyncFlowConsumer execinfra.RowReceiver, batchSyncFlowConsumer execinfra.BatchReceiver, localState LocalState, -) (context.Context, flowinfra.Flow, execinfra.OpChains, error) { +) (retCtx context.Context, _ flowinfra.Flow, _ execinfra.OpChains, retErr error) { if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) { err := errors.Errorf( "version mismatch in flow request: %d; this node accepts %d through %d", @@ -224,8 +224,27 @@ func (ds *ServerImpl) setupFlow( return ctx, nil, nil, err } + var sp *tracing.Span // will be Finish()ed by Flow.Cleanup() + var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup() + var onFlowCleanup func() + // Make sure that we clean up all resources (which in the happy case are + // cleaned up in Flow.Cleanup()) if an error is encountered. + defer func() { + if retErr != nil { + if sp != nil { + sp.Finish() + } + if monitor != nil { + monitor.Stop(ctx) + } + if onFlowCleanup != nil { + onFlowCleanup() + } + retCtx = tracing.ContextWithSpan(ctx, nil) + } + }() + const opName = "flow" - var sp *tracing.Span // will be Finish()ed by Flow.Cleanup() if parentSpan == nil { ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName) } else if localState.IsLocal { @@ -245,8 +264,7 @@ func (ds *ServerImpl) setupFlow( ) } - // The monitor opened here is closed in Flow.Cleanup(). - monitor := mon.NewMonitor( + monitor = mon.NewMonitor( "flow", mon.MemoryResource, ds.Metrics.CurBytesCount, @@ -275,7 +293,6 @@ func (ds *ServerImpl) setupFlow( var evalCtx *tree.EvalContext var leafTxn *kv.Txn - var onFlowCleanup func() if localState.EvalContext != nil { evalCtx = localState.EvalContext // We're about to mutate the evalCtx and we want to restore its original @@ -302,7 +319,6 @@ func (ds *ServerImpl) setupFlow( sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData) if err != nil { - sp.Finish() return ctx, nil, nil, err } ie := &lazyInternalExecutor{ @@ -375,14 +391,6 @@ func (ds *ServerImpl) setupFlow( ctx, opChains, err = f.Setup(ctx, &req.Flow, opt) if err != nil { log.Errorf(ctx, "error setting up flow: %s", err) - // Flow.Cleanup will not be called, so we have to close the memory monitor - // and finish the span manually. - monitor.Stop(ctx) - sp.Finish() - if onFlowCleanup != nil { - onFlowCleanup() - } - ctx = tracing.ContextWithSpan(ctx, nil) return ctx, nil, nil, err } if !f.IsLocal() { @@ -404,7 +412,6 @@ func (ds *ServerImpl) setupFlow( } else { // If I haven't created the leaf already, do it now. if leafTxn == nil { - var err error leafTxn, err = makeLeaf(req) if err != nil { return nil, nil, nil, err