Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: restore EvalCtx.Mon on the flow cleanup #69483

Merged
merged 2 commits into from Aug 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 33 additions & 13 deletions pkg/sql/distsql/server.go
Expand Up @@ -214,7 +214,7 @@ func (ds *ServerImpl) setupFlow(
rowSyncFlowConsumer execinfra.RowReceiver,
batchSyncFlowConsumer execinfra.BatchReceiver,
localState LocalState,
) (context.Context, flowinfra.Flow, execinfra.OpChains, error) {
) (retCtx context.Context, _ flowinfra.Flow, _ execinfra.OpChains, retErr error) {
if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) {
err := errors.Errorf(
"version mismatch in flow request: %d; this node accepts %d through %d",
Expand All @@ -224,8 +224,27 @@ func (ds *ServerImpl) setupFlow(
return ctx, nil, nil, err
}

var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanup func()
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
if retErr != nil {
if sp != nil {
sp.Finish()
}
if monitor != nil {
monitor.Stop(ctx)
}
if onFlowCleanup != nil {
onFlowCleanup()
}
retCtx = tracing.ContextWithSpan(ctx, nil)
}
}()

const opName = "flow"
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
if parentSpan == nil {
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName)
} else if localState.IsLocal {
Expand All @@ -245,8 +264,7 @@ func (ds *ServerImpl) setupFlow(
)
}

// The monitor opened here is closed in Flow.Cleanup().
monitor := mon.NewMonitor(
monitor = mon.NewMonitor(
"flow",
mon.MemoryResource,
ds.Metrics.CurBytesCount,
Expand Down Expand Up @@ -277,6 +295,14 @@ func (ds *ServerImpl) setupFlow(
var leafTxn *kv.Txn
if localState.EvalContext != nil {
evalCtx = localState.EvalContext
// We're about to mutate the evalCtx and we want to restore its original
// state once the flow cleans up. Note that we could have made a copy of
// the whole evalContext, but that isn't free, so we choose to restore
// the original state in order to avoid performance regressions.
origMon := evalCtx.Mon
onFlowCleanup = func() {
evalCtx.Mon = origMon
}
evalCtx.Mon = monitor
if localState.HasConcurrency {
var err error
Expand All @@ -293,7 +319,6 @@ func (ds *ServerImpl) setupFlow(

sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData)
if err != nil {
sp.Finish()
return ctx, nil, nil, err
}
ie := &lazyInternalExecutor{
Expand Down Expand Up @@ -349,7 +374,7 @@ func (ds *ServerImpl) setupFlow(
isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff
f := newFlow(
flowCtx, ds.flowRegistry, rowSyncFlowConsumer, batchSyncFlowConsumer,
localState.LocalProcs, isVectorized,
localState.LocalProcs, isVectorized, onFlowCleanup,
)
opt := flowinfra.FuseNormally
if !localState.MustUseLeafTxn() {
Expand All @@ -366,11 +391,6 @@ func (ds *ServerImpl) setupFlow(
ctx, opChains, err = f.Setup(ctx, &req.Flow, opt)
if err != nil {
log.Errorf(ctx, "error setting up flow: %s", err)
// Flow.Cleanup will not be called, so we have to close the memory monitor
// and finish the span manually.
monitor.Stop(ctx)
sp.Finish()
ctx = tracing.ContextWithSpan(ctx, nil)
return ctx, nil, nil, err
}
if !f.IsLocal() {
Expand All @@ -392,7 +412,6 @@ func (ds *ServerImpl) setupFlow(
} else {
// If I haven't created the leaf already, do it now.
if leafTxn == nil {
var err error
leafTxn, err = makeLeaf(req)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -471,8 +490,9 @@ func newFlow(
batchSyncFlowConsumer execinfra.BatchReceiver,
localProcessors []execinfra.LocalProcessor,
isVectorized bool,
onFlowCleanup func(),
) flowinfra.Flow {
base := flowinfra.NewFlowBase(flowCtx, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors)
base := flowinfra.NewFlowBase(flowCtx, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors, onFlowCleanup)
if isVectorized {
return colflow.NewVectorizedFlow(base)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/vectorized_panic_propagation_test.go
Expand Up @@ -48,6 +48,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) {
nil, /* rowSyncFlowConsumer */
nil, /* batchSyncFlowConsumer */
nil, /* localProcessors */
nil, /* onFlowCleanup */
)
flow := colflow.NewVectorizedFlow(base)

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/flowinfra/flow.go
Expand Up @@ -173,6 +173,8 @@ type FlowBase struct {
// - outboxes
waitGroup sync.WaitGroup

onFlowCleanup func()

doneFn func()

status flowStatus
Expand Down Expand Up @@ -227,6 +229,7 @@ func NewFlowBase(
rowSyncFlowConsumer execinfra.RowReceiver,
batchSyncFlowConsumer execinfra.BatchReceiver,
localProcessors []execinfra.LocalProcessor,
onFlowCleanup func(),
) *FlowBase {
// We are either in a single tenant cluster, or a SQL node in a multi-tenant
// cluster, where the SQL node is single tenant. The tenant below is used
Expand All @@ -248,6 +251,7 @@ func NewFlowBase(
batchSyncFlowConsumer: batchSyncFlowConsumer,
localProcessors: localProcessors,
admissionInfo: admissionInfo,
onFlowCleanup: onFlowCleanup,
}
base.status = FlowNotStarted
return base
Expand Down Expand Up @@ -499,6 +503,9 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
}
f.status = FlowFinished
f.ctxCancel()
if f.onFlowCleanup != nil {
f.onFlowCleanup()
}
if f.doneFn != nil {
f.doneFn()
}
Expand Down