Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
73995: docs: jobs explainer doc r=samiskin a=samiskin

Similar to "Life of a Query", this change adds an in-depth rundown of
how the job system functions and the various pieces in the code that
drive it and use it.

Release note: None

75430: cdc: refactor to replace map of changefeed targets with array r=[miretskiy] a=HonoreDB

Necessary intermediate step towards changefeeds on things other than full tables.
Previously, changefeed targets were serialized as a map of TableID->StatementTimeName.
This keeps that for now for compatibility but adds in a list of target specifications
that's more extendable, and changes the API for everything that will eventually need
to care (for example, an encoder will need to know that it's encoding a [column family](#28667)).

Still needs a test with the old-style payload.

Release note: None

75965: ui: Add column selector and filter to sessions page r=gtr a=gtr

Closes [#73463](https://cockroachlabs.atlassian.net/browse/CRDB-11600)

Previously, the sessions page was missing the column selector and filter
components that the stmts and txns page have. This change adds both
components in addition to new columns in the sessions overview page and
edits to the sessions details page.



Release note (ui change): added column selector, filters, new columns to
the sessions page and sessions details.

Screenshots:

Sessions overview page:
![sessions-overview-page](https://user-images.githubusercontent.com/35943354/153674473-639b23a5-031a-4d33-90cc-87db24ec7c4a.png)


Sessions overview page with column selector:
![sessions-overview-page-column-selector](https://user-images.githubusercontent.com/35943354/153674485-9503434f-0860-4b84-9ad4-0175586bd986.png)


Sessions overview page with results from column selector:
![sessions-overview-page-column-selector-results](https://user-images.githubusercontent.com/35943354/153674490-788627c7-0392-4d15-a2e5-9c2e79de92f9.png)


Sessions overview page with filter:
![sessions-overview-page-filter](https://user-images.githubusercontent.com/35943354/153674501-a83394ac-2a8a-4035-a030-4f2b2f402ab2.png)


Sessions overview page with results from filter:
![sessions-overview-page-filter-results](https://user-images.githubusercontent.com/35943354/153674508-5b98f52d-7ea0-48f9-bef4-88635878de71.png)


Sessions detail page:
![Screen Shot 2022-02-11 at 4 33 14 PM](https://user-images.githubusercontent.com/35943354/153673353-b984462e-092c-4849-b03a-9a33091e4f45.png)





Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
Co-authored-by: Aaron Zinger <zinger@cockroachlabs.com>
Co-authored-by: Gerardo Torres <gerardo.torrescastro@cockroachlabs.com>
  • Loading branch information
4 people committed Feb 22, 2022
4 parents 95ac880 + 0ef8382 + 062782c + 4923ebb commit e852be5
Show file tree
Hide file tree
Showing 37 changed files with 1,601 additions and 367 deletions.
599 changes: 599 additions & 0 deletions docs/tech-notes/jobs.md

Large diffs are not rendered by default.

25 changes: 18 additions & 7 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Expand Up @@ -102,15 +102,17 @@ func alterChangefeedPlanHook(
targetDescs = append(targetDescs, descs...)
}

newTargets, err := getTargets(ctx, p, targetDescs, details.Opts)
newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts)
if err != nil {
return err
}
// add old targets
for id, target := range details.Targets {
newTargets[id] = target
for id, table := range details.Tables {
newTables[id] = table
}
details.Targets = newTargets
details.Tables = newTables
details.TargetSpecifications = append(details.TargetSpecifications, newTargets...)

}

if opts.DropTargets != nil {
Expand All @@ -129,12 +131,21 @@ func alterChangefeedPlanHook(
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
}
delete(details.Targets, table.GetID())
delete(details.Tables, table.GetID())
}
}

newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets))
for _, ts := range details.TargetSpecifications {
if _, stillThere := details.Tables[ts.TableID]; stillThere {
newTargetSpecifications = append(newTargetSpecifications, ts)
}
}
details.TargetSpecifications = newTargetSpecifications

}

if len(details.Targets) == 0 {
if len(details.Tables) == 0 {
return errors.Errorf("cannot drop all targets for changefeed job %d", jobID)
}

Expand All @@ -152,7 +163,7 @@ func alterChangefeedPlanHook(
}

var targets tree.TargetList
for _, target := range details.Targets {
for _, target := range details.Tables {
targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName))
targets.Tables = append(targets.Tables, &targetName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Expand Up @@ -196,15 +196,15 @@ func createBenchmarkChangefeed(
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{
StatementTimeName: tableDesc.GetName(),
}},
Opts: map[string]string{
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeRow),
},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts, details.Targets)
encoder, err := makeJSONEncoder(details.Opts, AllTargets(details))
if err != nil {
return nil, nil, err
}
Expand All @@ -228,7 +228,7 @@ func createBenchmarkChangefeed(
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Targets: AllTargets(details),
Writer: buf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/changefeed.go
Expand Up @@ -55,7 +55,7 @@ func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) *ptpb.Record {
Expand All @@ -69,19 +69,21 @@ func createProtectedTimestampRecord(
jobsprotectedts.Jobs, targetToProtect)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
for _, t := range targets {
tablesToProtect = append(tablesToProtect, t.TableID)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
func makeSpansToProtect(
codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification,
) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
Expand All @@ -93,8 +95,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [
EndKey: tablePrefix.PrefixEnd(),
})
}
for t := range targets {
addTablePrefix(uint32(t))
for _, t := range targets {
addTablePrefix(uint32(t.TableID))
}
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Expand Up @@ -104,7 +104,7 @@ func distChangefeedFlow(
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS)
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS)
if err != nil {
return err
}
Expand All @@ -121,7 +121,7 @@ func distChangefeedFlow(
func fetchSpansForTargets(
ctx context.Context,
execCfg *sql.ExecutorConfig,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
Expand All @@ -133,10 +133,10 @@ func fetchSpansForTargets(
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
for _, table := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidLeased = true
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags)
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Expand Up @@ -159,7 +159,7 @@ func newChangeAggregatorProcessor(
}

var err error
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil {
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets,
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed),
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

Expand All @@ -358,7 +358,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Gossip: cfg.Gossip,
Spans: spans,
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
MM: ca.kvFeedMemMon,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func newChangeFrontierProcessor(
cf.freqEmitResolved = emitNoResolved
}

if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil {
if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1454,7 +1454,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
Expand Down
66 changes: 49 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Expand Up @@ -181,16 +181,17 @@ func changefeedPlanHook(
return err
}

targets, err := getTargets(ctx, p, targetDescs, opts)
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts)
if err != nil {
return err
}

details := jobspb.ChangefeedDetails{
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
Tables: tables,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
TargetSpecifications: targets,
}
progress := jobspb.Progress{
Progress: &jobspb.Progress_HighWater{},
Expand Down Expand Up @@ -243,7 +244,7 @@ func changefeedPlanHook(
return err
}

if _, err := getEncoder(details.Opts, details.Targets); err != nil {
if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil {
return err
}

Expand All @@ -269,7 +270,7 @@ func changefeedPlanHook(
}
telemetry.Count(`changefeed.create.sink.` + telemetrySink)
telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat])
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables)))

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
Expand Down Expand Up @@ -332,7 +333,7 @@ func changefeedPlanHook(
var protectedTimestampID uuid.UUID
codec := p.ExecCfg().Codec
if shouldProtectTimestamps(codec) {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed())
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed())
protectedTimestampID = ptr.ID.GetUUID()
}

Expand Down Expand Up @@ -470,35 +471,41 @@ func getTableDescriptors(
return targetDescs, err
}

func getTargets(
func getTargetsAndTables(
ctx context.Context,
p sql.PlanHookState,
targetDescs []catalog.Descriptor,
opts map[string]string,
) (jobspb.ChangefeedTargets, error) {
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
tables := make(jobspb.ChangefeedTargets, len(targetDescs))
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs))
for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return nil, err
return nil, nil, err
}
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, table, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified)
if err != nil {
return nil, err
return nil, nil, err
}
targets[table.GetID()] = jobspb.ChangefeedTarget{
tables[table.GetID()] = jobspb.ChangefeedTargetTable{
StatementTimeName: name,
}
ts := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: table.GetID(),
}
targets = append(targets, ts)
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return nil, err
return nil, nil, err
}
for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) {
for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) {
p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning))
}
}
}
return targets, nil
return targets, tables, nil
}

func validateSink(
Expand Down Expand Up @@ -953,3 +960,28 @@ func getChangefeedTargetName(
}
return desc.GetName(), nil
}

// AllTargets gets all the targets listed in a ChangefeedDetails,
// from the statement time name map in old protos
// or the TargetSpecifications in new ones.
func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) {
//TODO: Use a version gate for this once we have CDC version gates
if len(cd.TargetSpecifications) > 0 {
for _, ts := range cd.TargetSpecifications {
if ts.TableID > 0 {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
targets = append(targets, ts)
}
}
} else {
for id, t := range cd.Tables {
ct := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: id,
StatementTimeName: t.StatementTimeName,
}
targets = append(targets, ct)
}
}
return
}
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Expand Up @@ -15,9 +15,19 @@ import (
)

// ValidateTable validates that a table descriptor can be watched by a CHANGEFEED.
func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error {
t, ok := targets[tableDesc.GetID()]
if !ok {
func ValidateTable(
targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor,
) error {
var t jobspb.ChangefeedTargetSpecification
var found bool
for _, cts := range targets {
if cts.TableID == tableDesc.GetID() {
t = cts
found = true
break
}
}
if !found {
return errors.Errorf(`unwatched table: %s`, tableDesc.GetName())
}

Expand Down

0 comments on commit e852be5

Please sign in to comment.