Skip to content

Commit

Permalink
distsql: initialize the flow scheduler earlier
Browse files Browse the repository at this point in the history
In 38dbeae we broke up the creation of
the flow scheduler in order to give access to it to the status server.
That required separating out initialization of the flow metrics into
a separate step that is performed after the flow scheduler is created.

However, as it turns out, it is possible that the DistSQL server
receives some RPC calls before it is `Start`ed (the place where we put
the metrics initialization). This can happen because the DistSQL server
is registered as a gRPC service once it is created, and if an RPC comes
in before the server is started, a NPE would currently occur.

This commit fixes the issue by moving the initialization of the flow
scheduler into the constructor of the DistSQL server (early enough to
not race against incoming RPCs).

Release note: None (no stable release with this bug)
  • Loading branch information
yuzefovich committed Sep 14, 2021
1 parent 51ca9fc commit 6569425
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func NewServer(
),
}
ds.memMonitor.Start(ctx, cfg.ParentMemoryMonitor, mon.BoundAccount{})
// We have to initialize the flow scheduler at the same time we're creating
// the DistSQLServer because the latter will be registered as a gRPC service
// right away, so the RPCs might start coming in pretty much right after the
// current method returns. See #66330.
ds.flowScheduler.Init(ds.Metrics)

colexec.HashAggregationDiskSpillingEnabled.SetOnChange(&cfg.Settings.SV, func() {
if !colexec.HashAggregationDiskSpillingEnabled.Get(&cfg.Settings.SV) {
Expand All @@ -100,6 +105,12 @@ func NewServer(
}

// Start launches workers for the server.
//
// Note that the initialization of the server required for performing the
// incoming RPCs needs to go into NewServer above because once that method
// returns, the server is registered as a gRPC service and needs to be fully
// initialized. For example, the initialization of the flow scheduler has to
// happen in NewServer.
func (ds *ServerImpl) Start() {
// Gossip the version info so that other nodes don't plan incompatible flows
// for us.
Expand All @@ -122,7 +133,6 @@ func (ds *ServerImpl) Start() {
panic(err)
}

ds.flowScheduler.Init(ds.Metrics)
ds.flowScheduler.Start()
}

Expand Down

0 comments on commit 6569425

Please sign in to comment.