Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
67866: sql: implement SQL Stats flush logic r=Azhng a=Azhng

Previous PR: #67805
Next Chained PR: #67090

## First Commit

sql: remove `count` from stmt/txn stats system table

Previously, system.statement_statistics and
system.transaction_statistics table includes a `count` column
that corresponds to `roachpb.StatementStatistics.Count` and
`roachpb.TransactionStatistics.Count` fields respectively.
The objective for that column is to make
`INSERT ON CONFLICT DO UPDATE` style query easy. However,
since early prototyping have shown that
`INSERT ON CONFLICT DO UPDATE` style statement is quite inefficient,
the SQL Stats flush mechanism will be implemented using
separate queries INSERT and UPDATE statements.
This column is no longer userful and it would require special handling.
Removing this column simplifies the flush logic and removes the
need for special handlings.

Release note (sql change): count column is removed from
 system.statement_statistics and system.transaction_statistics
 tables.

## Second Commit

sql: implement persistedsqlstats flush logic

This commit implements the initial flush logic of the
persisted sql stats subsystem.

Release note: None

68426: kv: assert txn unused in SetFixedTimestamp r=nvanbenschoten a=nvanbenschoten

This commit asserts that a transaction has not been used to read or to
write by the time that `SetFixedTimestamp` is called on it.

This was extracted from #68194 and modified to return an error from
`SetFixedTimestamp` on misuse instead of fatal-ing. This provides a
sufficient, temporary backstop for #68216 until the conn executor logic
is fixed:

```
root@127.0.0.1:26257/movr> create table t (x int);
CREATE TABLE

root@127.0.0.1:26257/movr> insert into t values (1);
INSERT 1

root@127.0.0.1:26257/movr> select crdb_internal_mvcc_timestamp, * from t;
   crdb_internal_mvcc_timestamp  | x
---------------------------------+----
  1628094563935439000.0000000000 | 1
(1 row)

root@127.0.0.1:26257/movr> begin as of system time (1628094563935439000.0000000000-1)::string;
BEGIN

root@127.0.0.1:26257/movr  OPEN> select * from t;
  x
-----
(0 rows)

root@127.0.0.1:26257/movr  OPEN> prepare y as select * from t as of system time 1628094563935439000.0000000000;
ERROR: internal error: cannot set fixed timestamp, txn "sql txn" meta={id=e5e81c19 pri=0.01517572 epo=0 ts=1628094563.935438999,0 min=1628094563.935438999,0 seq=0} lock=false stat=PENDING rts=1628094563.935438999,0 wto=false gul=1628094563.935438999,0 already performed reads
SQLSTATE: XX000
DETAIL: stack trace:
github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord/txn_coord_sender.go:1016: SetFixedTimestamp()
github.com/cockroachdb/cockroach/pkg/kv/txn.go:1200: SetFixedTimestamp()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:278: populatePrepared()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:220: func1()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:226: prepare()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:112: addPreparedStmt()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_exec.go:570: execStmtInOpenState()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_exec.go:126: execStmt()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1626: func1()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1628: execCmd()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1550: run()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:627: ServeConn()
github.com/cockroachdb/cockroach/pkg/sql/pgwire/conn.go:645: func1()
runtime/asm_amd64.s:1371: goexit()

HINT: You have encountered an unexpected error.

Please check the public issue tracker to check whether this problem is
already tracked. If you cannot find it there, please report the error
with details by creating a new issue.

If you would rather not post publicly, please contact us directly
using the support form.

We appreciate your feedback.

root@127.0.0.1:26257/? ERROR>
```

68442: kv: include RangeID in rangefeed goroutine stacks r=nvanbenschoten a=nvanbenschoten

This commit includes the RangeID in each of a rangefeed processor and
its registations' associated goroutine stacks. This is a cheap and easy
way to get better observability into the ranges that have active
rangefeeds. It also tells us where those goroutines are spending their
time.

This will also become easier to use in Go 1.17, which improved the
format of stack traces.

68443: parser: add VIRTUAL syntax to help r=RaduBerinde a=rafiss

Release note: None

Co-authored-by: Azhng <archer.xn@gmail.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
4 people committed Aug 5, 2021
5 parents 636ee52 + 8383509 + b761eba + 1306da5 + 404fc7b commit 78a788f
Show file tree
Hide file tree
Showing 63 changed files with 1,537 additions and 153 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Expand Up @@ -129,6 +129,7 @@ sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enable
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh
sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh
sql.stats.flush.interval duration 1h0m0s the interval at which SQL execution statistics are flushed to disk
sql.stats.histogram_collection.enabled boolean true histogram collection mode
sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode
sql.stats.post_events.enabled boolean false if set, an event is logged for every CREATE STATISTICS job
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Expand Up @@ -133,6 +133,7 @@
<tr><td><code>sql.stats.automatic_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>automatic statistics collection mode</td></tr>
<tr><td><code>sql.stats.automatic_collection.fraction_stale_rows</code></td><td>float</td><td><code>0.2</code></td><td>target fraction of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.automatic_collection.min_stale_rows</code></td><td>integer</td><td><code>500</code></td><td>target minimum number of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.flush.interval</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the interval at which SQL execution statistics are flushed to disk</td></tr>
<tr><td><code>sql.stats.histogram_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>histogram collection mode</td></tr>
<tr><td><code>sql.stats.multi_column_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>multi-column statistics collection mode</td></tr>
<tr><td><code>sql.stats.post_events.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, an event is logged for every CREATE STATISTICS job</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Expand Up @@ -295,6 +295,7 @@ ALL_TESTS = [
"//pkg/sql/sqlliveness/slinstance:slinstance_test",
"//pkg/sql/sqlliveness/slstorage:slstorage_test",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil:sqlstatsutil_test",
"//pkg/sql/sqlstats/persistedsqlstats:persistedsqlstats_test",
"//pkg/sql/stats:stats_test",
"//pkg/sql/stmtdiagnostics:stmtdiagnostics_test",
"//pkg/sql/tests:tests_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Expand Up @@ -39,4 +39,5 @@ type TestingKnobs struct {
BackupRestore ModuleTestingKnobs
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
SQLStatsKnobs ModuleTestingKnobs
}
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_planning.go
Expand Up @@ -328,7 +328,9 @@ func spansForAllTableIndexes(
checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) {
var foundKV bool
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, endTime)
if err := txn.SetFixedTimestamp(ctx, endTime); err != nil {
return err
}
res, err := txn.Scan(ctx, start, end, 1 /* maxRows */)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/backupccl/backupresolver/targets.go
Expand Up @@ -571,8 +571,11 @@ func LoadAllDescs(
var allDescs []catalog.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *kv.Txn) (err error) {
txn.SetFixedTimestamp(ctx, asOf)
func(ctx context.Context, txn *kv.Txn) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
allDescs, err = catalogkv.GetAllDescriptors(
ctx, txn, codec, true, /* shouldRunPostDeserializationChanges */
)
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Expand Up @@ -129,7 +129,9 @@ func fetchSpansForTargets(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
spans = nil
txn.SetFixedTimestamp(ctx, ts)
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Expand Up @@ -103,7 +103,9 @@ func (p *scanRequestScanner) exportSpan(
if log.V(2) {
log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts)
}
txn.SetFixedTimestamp(ctx, ts)
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
stopwatchStart := timeutil.Now()
var scanDuration, bufferDuration time.Duration
const targetBytesPerScan = 16 << 20 // 16 MiB
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Expand Up @@ -101,8 +101,10 @@ func (c *rowFetcherCache) TableDescForKey(
// descs.Collection directly here.
// TODO (SQL Schema): #53751.
if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts)
var err error
err := txn.SetFixedTimestamp(ctx, ts)
if err != nil {
return err
}
tableDesc, err = c.collection.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{})
return err
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Expand Up @@ -274,7 +274,9 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error {
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
initialDescs = initialDescs[:0]
txn.SetFixedTimestamp(ctx, initialTableDescTs)
if err := txn.SetFixedTimestamp(ctx, initialTableDescTs); err != nil {
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range tf.targets {
flags := tree.ObjectLookupFlagsWithRequired()
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Expand Up @@ -400,11 +400,11 @@ func TestOracle(t *testing.T) {

c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper)
staleTxn := kv.NewTxn(ctx, c, 0)
staleTxn.SetFixedTimestamp(ctx, stale)
require.NoError(t, staleTxn.SetFixedTimestamp(ctx, stale))
currentTxn := kv.NewTxn(ctx, c, 0)
currentTxn.SetFixedTimestamp(ctx, current)
require.NoError(t, currentTxn.SetFixedTimestamp(ctx, current))
futureTxn := kv.NewTxn(ctx, c, 0)
futureTxn.SetFixedTimestamp(ctx, future)
require.NoError(t, futureTxn.SetFixedTimestamp(ctx, future))

nodes := mockNodeStore{
{NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")},
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Expand Up @@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed(
case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
roachpb.RangeFeedRetryError_REASON_RANGE_MERGED,
roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER:
// Evict the decriptor from the cache.
// Evict the descriptor from the cache.
rangeInfo.token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh)
default:
Expand All @@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed(
}

// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
// RPC call. Results will be send on the provided channel. Returns the timestamp
// RPC call. Results will be sent on the provided channel. Returns the timestamp
// of the maximum rangefeed checkpoint seen, which can be used to re-establish
// the rangefeed with a larger starting timestamp, reflecting the fact that all
// values up to the last checkpoint have already been observed. Returns the
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Expand Up @@ -1008,9 +1008,19 @@ func (tc *TxnCoordSender) CommitTimestampFixed() bool {
}

// SetFixedTimestamp is part of the client.TxnSender interface.
func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
tc.mu.Lock()
defer tc.mu.Unlock()
// The transaction must not have already been used in this epoch.
if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx)
}
if tc.mu.txn.Sequence != 0 {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx)
}

tc.mu.txn.ReadTimestamp = ts
tc.mu.txn.WriteTimestamp = ts
tc.mu.txn.GlobalUncertaintyLimit = ts
Expand All @@ -1019,6 +1029,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
tc.mu.txn.MinTimestamp.Backward(ts)
return nil
}

// RequiredFrontier is part of the client.TxnSender interface.
Expand Down
88 changes: 88 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Expand Up @@ -2672,3 +2672,91 @@ func TestTxnManualRefresh(t *testing.T) {
})
}
}

// TestTxnCoordSenderSetFixedTimestamp tests that SetFixedTimestamp cannot be
// called after a transaction has already been used in the current epoch to read
// or write.
func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

for _, test := range []struct {
name string
before func(*testing.T, *kv.Txn)
expErr string
}{
{
name: "nothing before",
before: func(t *testing.T, txn *kv.Txn) {},
},
{
name: "read before",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
},
expErr: "cannot set fixed timestamp, .* already performed reads",
},
{
name: "write before",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
},
expErr: "cannot set fixed timestamp, .* already performed writes",
},
{
name: "read and write before",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
},
expErr: "cannot set fixed timestamp, .* already performed reads",
},
{
name: "read before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
{
name: "write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
{
name: "read and write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
} {
t.Run(test.name, func(t *testing.T) {
s := createTestDB(t)
defer s.Stop()

txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
test.before(t, txn)

ts := s.Clock.Now()
err := txn.SetFixedTimestamp(ctx, ts)
if test.expErr != "" {
require.Error(t, err)
require.Regexp(t, test.expErr, err)
require.False(t, txn.CommitTimestampFixed())
} else {
require.NoError(t, err)
require.True(t, txn.CommitTimestampFixed())
require.Equal(t, ts, txn.CommitTimestamp())
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Expand Up @@ -74,7 +74,9 @@ func (dbc *dbAdapter) Scan(
ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue),
) error {
return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, asOf)
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
sp := span
var b kv.Batch
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Expand Up @@ -3174,7 +3174,7 @@ func TestStrictGCEnforcement(t *testing.T) {
}
mkStaleTxn = func() *kv.Txn {
txn := db.NewTxn(ctx, "foo")
txn.SetFixedTimestamp(ctx, tenSecondsAgo)
require.NoError(t, txn.SetFixedTimestamp(ctx, tenSecondsAgo))
return txn
}
getRejectedMsg = func() string {
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/rangefeed/processor.go
Expand Up @@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error {
// Config encompasses the configuration required to create a Processor.
type Config struct {
log.AmbientContext
Clock *hlc.Clock
Span roachpb.RSpan
Clock *hlc.Clock
RangeID roachpb.RangeID
Span roachpb.RSpan

TxnPusher TxnPusher
// PushTxnsInterval specifies the interval at which a Processor will push
Expand Down Expand Up @@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, rtsIterFunc, stopper)
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
Expand All @@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor

// run is called from Start and runs the rangefeed.
func (p *Processor) run(
ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
ctx context.Context,
_forStacks roachpb.RangeID,
rtsIterFunc IteratorConstructor,
stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
Expand Down Expand Up @@ -256,7 +260,7 @@ func (p *Processor) run(

// Run an output loop for the registry.
runOutputLoop := func(ctx context.Context) {
r.runOutputLoop(ctx)
r.runOutputLoop(ctx, p.RangeID)
select {
case p.unregC <- &r:
case <-p.stoppedC:
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/registry.go
Expand Up @@ -264,7 +264,7 @@ func (r *registration) outputLoop(ctx context.Context) error {
}
}

func (r *registration) runOutputLoop(ctx context.Context) {
func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) {
r.mu.Lock()
ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx)
r.mu.Unlock()
Expand Down

0 comments on commit 78a788f

Please sign in to comment.