Skip to content

Commit

Permalink
spanconfigkvsubscriber: fix missing system span configs
Browse files Browse the repository at this point in the history
Previously, if a range did not have an overlapping span
config then `ForEachOverlappingSpanConfig` would not apply
the relevant system span configs that may still apply to
that range. One situation in which we could end up with
such a range when a table is dropped and all of its data (including the
range deletion tombstone installed by the drop) is GC'ed, the associated
schema change GC job will delete the table's span config. In this case, we
will not find any overlapping span configs for the table's span, but a
system span config, such as a cluster wide protection policy, may still be
applicable to the replica with the empty table span. A scan
of that span AOST the timestamp at which we wrote the protection
policy could result in a BatchTimestampBeforeGCError.

This change fixes the above bug by applying a fallback config
to the span with no overlapping span configs, and combining the
system span configs that apply to the span. The change adds a
red-green test to exercise this logic.

Informs: #113867
Release note (bug fix): an empty range corresponding to a drop table
did not respect system level span configurations such as protected
timestamps, potentially causing reads above the protected timestamp
to fail
  • Loading branch information
adityamaru committed Nov 7, 2023
1 parent b8ce6da commit 38541f6
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Expand Up @@ -373,6 +373,8 @@ go_test(
"//pkg/config/zonepb",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
Expand Down Expand Up @@ -450,6 +452,7 @@ go_test(
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/gcjob",
"//pkg/sql/isql",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
Expand All @@ -461,6 +464,7 @@ go_test(
"//pkg/testutils/datapathutils",
"//pkg/testutils/echotest",
"//pkg/testutils/gossiputil",
"//pkg/testutils/jobutils",
"//pkg/testutils/kvclientutils",
"//pkg/testutils/listenerutil",
"//pkg/testutils/serverutils",
Expand Down
220 changes: 220 additions & 0 deletions pkg/kv/kvserver/client_mvcc_gc_test.go
Expand Up @@ -12,15 +12,35 @@ package kvserver_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -66,3 +86,203 @@ func TestMVCCGCCorrectStats(t *testing.T) {
require.Equal(t, oldKeyBytes, newStats.KeyBytes)
require.Equal(t, oldValBytes, newStats.ValBytes)
}

// TestSystemSpanConfigProtectionPoliciesApplyAfterGC is a regression test for
// https://github.com/cockroachdb/cockroach/issues/113867. This test attempts
// to recreate the following observed timeline:
// - Drop table @ t=1
// - SCHEMA CHANGE GC Job writes range deletion @ t=1, waits for span to be empty
// - Write a cluster-wide PTS @ t=2
// - MVCC GC Runs up to t=2
// - Table span is now empty, SCHEMA CHANGE GC JOB sees span is empty, and
// deletes span configuration
// - Range with empty span configuration, falls back to the fallback span
// configuration and does not respect the cluster-wide protection thereby
// allowing its GC threshold past t2
// - Read @ t2 fails with BatchTimestampBeforeGCError
//
// The last two steps are where the bug lies as the range should have respected
// the cluster-wide protection and not allowed its GC threshold to be set past
// t2.
func TestSystemSpanConfigProtectionPoliciesApplyAfterGC(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer gcjob.SetSmallMaxGCIntervalForTest()()

ctx := context.Background()
var args base.TestClusterArgs
args.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
args.ServerArgs.Knobs.SpanConfig = &spanconfig.TestingKnobs{
OverrideFallbackConf: func(config roachpb.SpanConfig) roachpb.SpanConfig {
overrideCfg := config
overrideCfg.GCPolicy.TTLSeconds = 1
return overrideCfg
},
}
tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)
sqlDB := tc.ApplicationLayer(0).SQLConn(t)
st := tc.ApplicationLayer(0).ClusterSettings()
gcjob.EmptySpanPollInterval.Override(ctx, &st.SV, 100*time.Millisecond)

// Speed up propagation of span configs and protected timestamps.
sysDB := sqlutils.MakeSQLRunner(tc.SystemLayer(0).SQLConn(t))
sysDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
sysDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
sysDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '10ms'")

// Infrastructure to create a protected timestamp record.
mkRecordAndProtect := func(s serverutils.ApplicationLayerInterface, ts hlc.Timestamp, target *ptpb.Target) *ptpb.Record {
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
ptp := execCfg.ProtectedTimestampProvider
insqlDB := execCfg.InternalDB
recordID := uuid.MakeV4()
rec := jobsprotectedts.MakeRecord(recordID, int64(1), ts, nil, /* deprecatedSpans */
jobsprotectedts.Jobs, target)
require.NoError(t, insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return ptp.WithTxn(txn).Protect(ctx, rec)
}))
return rec
}

runner := sqlutils.MakeSQLRunner(sqlDB)
runner.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)

var tableID int
runner.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'foo'").Scan(&tableID)
require.NotEqual(t, 0, tableID)
tablePrefix := tc.ApplicationLayer(0).Codec().TablePrefix(uint32(tableID))
tc.SplitRangeOrFatal(t, tablePrefix)
require.NoError(t, tc.WaitForSplitAndInitialization(tablePrefix))

// Lower the GC TTL of table foo and allow the zone config to get reconciled.
s := tc.Server(0)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
repl := store.LookupReplica(roachpb.RKey(tablePrefix))
_, err = sqlDB.Exec(`ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1`)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
cfg, err := repl.LoadSpanConfig(ctx)
require.NoError(t, err)
if cfg.GCPolicy.TTLSeconds != 1 {
return errors.New("waiting for span config to apply")
}
return nil
})

for i := 0; i < 10; i++ {
runner.Exec(t, `INSERT INTO foo VALUES ($1)`, i)
}

// Drop the table and wait for the MVCC GC schema change job to write its
// range deletion tombstone and start polling to check if the target span
// has been gc'ed.
runner.Exec(t, `DROP TABLE foo CASCADE `)
var jobID int64
runner.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'").Scan(&jobID)
require.NotEqual(t, 0, jobID)
runner.CheckQueryResultsRetry(t, `
SELECT count(*)
FROM (
SELECT job_id
FROM [SHOW JOBS]
WHERE job_type = 'SCHEMA CHANGE GC'
AND status = 'running'
AND running_status = 'waiting for MVCC GC'
)`,
[][]string{{"1"}})

// Write a protected timestamp record targeting the entire cluster's keyspace.
clusterTarget := ptpb.MakeClusterTarget()
protectedTime := timeutil.Now().UnixNano()
mkRecordAndProtect(tc.ApplicationLayer(0), hlc.Timestamp{WallTime: protectedTime}, clusterTarget)

// Wait for the protection policy to apply to foo's range.
repl = store.LookupReplica(roachpb.RKey(tablePrefix))
testutils.SucceedsSoon(t, func() error {
cfg, err := repl.LoadSpanConfig(ctx)
require.NoError(t, err)
if len(cfg.GCPolicy.ProtectionPolicies) == 0 {
return errors.New("waiting for span config to apply")
}
require.NoError(t, repl.ReadProtectedTimestampsForTesting(ctx))
return nil
})

runner.CheckQueryResultsRetry(t, `
SELECT count(*)
FROM (
SELECT job_id
FROM [SHOW JOBS]
WHERE job_type = 'SCHEMA CHANGE GC'
AND status = 'running'
AND running_status = 'waiting for MVCC GC'
)`,
[][]string{{"1"}})

// Run table foo's range through the MVCC GC queue until the GCThreshold is
// past the point keys and range deletion tombstone thereby classifying the
// range as empty.
db := tc.SystemLayer(0).DB()
testutils.SucceedsSoon(t, func() error {
require.NoError(t, store.ManualMVCCGC(repl))
var ba kv.Batch
ba.Header.MaxSpanRequestKeys = 1
ba.AddRawRequest(&kvpb.IsSpanEmptyRequest{
RequestHeader: kvpb.RequestHeader{
Key: tablePrefix, EndKey: tablePrefix.PrefixEnd(),
},
})
require.NoError(t, db.Run(ctx, &ba))
if !ba.RawResponse().Responses[0].GetIsSpanEmpty().IsEmpty() {
return errors.New("table span is not empty; re-run GC")
}
return nil
})

// Wait for the GC job to see that the span is empty and delete the zone
// config for the dropped table.
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(jobID))

// Verify that the span config for the table is gone.
kvAccessor := s.SpanConfigKVAccessor().(spanconfig.KVAccessor)
kvSubscriber := s.SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
sp := roachpb.Span{Key: tablePrefix, EndKey: tablePrefix.PrefixEnd()}
testutils.SucceedsSoon(t, func() error {
records, err := kvAccessor.GetSpanConfigRecords(ctx, spanconfig.Targets{
spanconfig.MakeTargetFromSpan(sp),
})
require.NoError(t, err)
if len(records) != 0 {
return errors.New("expected no span config records for table foo")
}
return nil
})

// Refresh the KVSubscriber to ensure it is caught up to now. We expect the
// cluster-wide protection to be combined with the fallback span config and
// applied to foo's empty range.
ptsReader := store.GetStoreConfig().ProtectedTimestampReader
require.NoError(t,
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, s.Clock().Now()))
ts, _, err := kvSubscriber.GetProtectionTimestamps(ctx, sp)
require.NoError(t, err)
require.Len(t, ts, 1)

ptsTime := hlc.Timestamp{WallTime: protectedTime}
testutils.SucceedsSoon(t, func() error {
require.NoError(t, store.ManualMVCCGC(repl))
require.NoError(t,
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, s.Clock().Now()))
threshold := repl.GetGCThreshold()
if !threshold.Equal(ptsTime.Prev()) {
require.False(t, ptsTime.Prev().Less(threshold), "range GC threshold should never pass protected timestamp")
return errors.New("range GC threshold is not advanced far enough")
}
return nil
})
runner.Exec(t,
fmt.Sprintf(`SELECT * FROM crdb_internal.scan(crdb_internal.table_span($1)) AS OF SYSTEM TIME '%d'`,
protectedTime), tableID)
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_protected_timestamp.go
Expand Up @@ -148,8 +148,8 @@ func (r *Replica) checkProtectedTimestampsForGC(
}

if read.readAt.Less(lease.Start.ToTimestamp()) {
log.VEventf(ctx, 1, "not gc'ing replica %v because current lease %v started after record was read %v",
r, lease, read.readAt)
log.VEventf(ctx, 1, "not gc'ing replica %v because current lease %v started after record was"+
" read %v", r, lease, read.readAt)
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, nil
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/spanconfig/spanconfig.go
Expand Up @@ -224,9 +224,15 @@ type Reconciler interface {
type Store interface {
StoreWriter
StoreReader
// ForEachOverlappingSpanConfig invokes the supplied callback on each
// span config that overlaps with the supplied span. In addition to the
// SpanConfig, the span it applies over is passed into the callback as well.
// ForEachOverlappingSpanConfig invokes the supplied callback on each span
// config that overlaps with the supplied span. The config is combined with
// all the system span configs that also apply to this span. In addition to
// the SpanConfig, the span it applies over is passed into the callback as
// well.
//
// If there are no overlapping configs for the supplied span, the supplied
// callback is invoked on the fallback config combined with any applicable
// system span configs
ForEachOverlappingSpanConfig(
context.Context, roachpb.Span, func(roachpb.Span, roachpb.SpanConfig) error,
) error
Expand Down
29 changes: 28 additions & 1 deletion pkg/spanconfig/spanconfigstore/store.go
Expand Up @@ -176,6 +176,9 @@ func (s *Store) getFallbackConfig() roachpb.SpanConfig {
if conf := FallbackConfigOverride.Get(&s.settings.SV).(*roachpb.SpanConfig); !conf.IsEmpty() {
return *conf
}
if s.knobs != nil && s.knobs.OverrideFallbackConf != nil {
return s.knobs.OverrideFallbackConf(s.fallback)
}
return s.fallback
}

Expand All @@ -196,13 +199,37 @@ func (s *Store) ForEachOverlappingSpanConfig(
) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mu.spanConfigStore.forEachOverlapping(span, func(sp roachpb.Span, conf roachpb.SpanConfig) error {
var foundOverlapping bool
err := s.mu.spanConfigStore.forEachOverlapping(span, func(sp roachpb.Span, conf roachpb.SpanConfig) error {
foundOverlapping = true
config, err := s.getSpanConfigForKeyRLocked(ctx, roachpb.RKey(sp.Key))
if err != nil {
return err
}
return f(sp, config)
})
if err != nil {
return err
}
// For a span that doesn't overlap with any span configs, we use the fallback
// config combined with the system span configs that may be applicable to the
// span.
//
// For example, when a table is dropped and all of its data (including the
// range deletion tombstone installed by the drop) is GC'ed, the associated
// schema change GC job will delete the table's span config. In this case, we
// will not find any overlapping span configs for the table's span, but a
// system span config, such as a cluster wide protection policy, may still be
// applicable to the replica with the empty table span.
if !foundOverlapping {
log.VInfof(ctx, 3, "no overlapping span config found for span %s", span)
config, err := s.getSpanConfigForKeyRLocked(ctx, roachpb.RKey(span.Key))
if err != nil {
return err
}
return f(span, config)
}
return nil
}

// Clone returns a copy of the Store.
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigstore/testdata/single/internal
Expand Up @@ -3,6 +3,7 @@

overlapping span=[a,z)
----
[a,z):FALLBACK

apply
set [b,d):A
Expand Down Expand Up @@ -40,6 +41,7 @@ deleted [f,g)

overlapping span=[f,g)
----
[f,g):FALLBACK

overlapping span=[b,j)
----
Expand Down

0 comments on commit 38541f6

Please sign in to comment.