Skip to content

Commit

Permalink
Merge pull request #79749 from samiskin/backport-release-22.1-79513
Browse files Browse the repository at this point in the history
release-22.1: changefeedccl: add structured logging for changefeed create and failed
  • Loading branch information
samiskin committed Apr 11, 2022
2 parents db34b33 + 2d88bb6 commit 2732c69
Show file tree
Hide file tree
Showing 10 changed files with 540 additions and 4 deletions.
42 changes: 42 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2382,6 +2382,48 @@ An event of type `captured_index_usage_stats`
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

### `changefeed_failed`

An event of type `changefeed_failed` is an event for any Changefeed failure since the plan hook
was triggered.


| Field | Description | Sensitive |
|--|--|--|
| `FailureType` | The reason / environment with which the changefeed failed (ex: connection_closed, changefeed_behind) | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | no |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |

### `create_changefeed`

An event of type `create_changefeed` is an event for any CREATE CHANGEFEED query that
successfully starts running. Failed CREATE statements will show up as
ChangefeedFailed events.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | no |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |

### `sampled_query`

An event of type `sampled_query` is the SQL query event logged to the telemetry channel. It
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
Expand Down Expand Up @@ -221,6 +222,7 @@ go_test(
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
Expand Down
111 changes: 107 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -133,7 +134,8 @@ func changefeedPlanHook(
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// rowFn impements sql.PlanHookRowFn
rowFn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

Expand All @@ -143,7 +145,7 @@ func changefeedPlanHook(

sinkURI, err := sinkURIFn()
if err != nil {
return err
return changefeedbase.MarkTaggedError(err, changefeedbase.UserInput)
}

if !unspecifiedSink && sinkURI == `` {
Expand All @@ -167,7 +169,7 @@ func changefeedPlanHook(
`changefeed.create`,
)
if err != nil {
return err
return changefeedbase.MarkTaggedError(err, changefeedbase.UserInput)
}

details := jr.Details.(jobspb.ChangefeedDetails)
Expand All @@ -189,6 +191,7 @@ func changefeedPlanHook(
p.ExtendedEvalContext().Descs.ReleaseAll(ctx)

telemetry.Count(`changefeed.create.core`)
logChangefeedCreateTelemetry(ctx, jr)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
if err != nil {
telemetry.Count(`changefeed.core.error`)
Expand Down Expand Up @@ -246,6 +249,8 @@ func changefeedPlanHook(
return err
}

logChangefeedCreateTelemetry(ctx, jr)

select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -254,9 +259,16 @@ func changefeedPlanHook(
}:
return nil
}
}

rowFnLogErrors := func(ctx context.Context, pn []sql.PlanNode, resultsCh chan<- tree.Datums) error {
err := rowFn(ctx, pn, resultsCh)
if err != nil {
logChangefeedFailedTelemetry(ctx, nil, failureTypeForStartupError(err))
}
return err
}
return fn, header, nil, avoidBuffering, nil
return rowFnLogErrors, header, nil, avoidBuffering, nil
}

func createChangefeedJobRecord(
Expand Down Expand Up @@ -1077,6 +1089,7 @@ func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, jobExec interfac
} else {
telemetry.Count(`changefeed.enterprise.fail`)
exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1)
logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError)
}
return nil
}
Expand Down Expand Up @@ -1225,3 +1238,93 @@ func uniqueTableNames(cts tree.ChangefeedTargets) tree.TargetList {

return targetList
}

func logChangefeedCreateTelemetry(ctx context.Context, jr *jobs.Record) {
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if jr != nil {
changefeedDetails := jr.Details.(jobspb.ChangefeedDetails)
changefeedEventDetails = getCommonChangefeedEventDetails(ctx, changefeedDetails, jr.Description)
}

createChangefeedEvent := &eventpb.CreateChangefeed{
CommonChangefeedEventDetails: changefeedEventDetails,
}

log.StructuredEvent(ctx, createChangefeedEvent)
}

func logChangefeedFailedTelemetry(
ctx context.Context, job *jobs.Job, failureType changefeedbase.FailureType,
) {
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if job != nil {
changefeedDetails := job.Details().(jobspb.ChangefeedDetails)
changefeedEventDetails = getCommonChangefeedEventDetails(ctx, changefeedDetails, job.Payload().Description)
}

changefeedFailedEvent := &eventpb.ChangefeedFailed{
CommonChangefeedEventDetails: changefeedEventDetails,
FailureType: failureType,
}

log.StructuredEvent(ctx, changefeedFailedEvent)
}

func getCommonChangefeedEventDetails(
ctx context.Context, details jobspb.ChangefeedDetails, description string,
) eventpb.CommonChangefeedEventDetails {
opts := details.Opts

sinkType := "core"
if details.SinkURI != `` {
parsedSink, err := url.Parse(details.SinkURI)
if err != nil {
log.Warningf(ctx, "failed to parse sink for telemetry logging: %v", err)
}
sinkType = parsedSink.Scheme
}

var initialScan string
initialScanType, initialScanSet := opts[changefeedbase.OptInitialScan]
_, initialScanOnlySet := opts[changefeedbase.OptInitialScanOnly]
_, noInitialScanSet := opts[changefeedbase.OptNoInitialScan]
if initialScanSet && initialScanType == `` {
initialScan = `yes`
} else if initialScanSet && initialScanType != `` {
initialScan = initialScanType
} else if initialScanOnlySet {
initialScan = `only`
} else if noInitialScanSet {
initialScan = `no`
}

var resolved string
resolvedValue, resolvedSet := opts[changefeedbase.OptResolvedTimestamps]
if !resolvedSet {
resolved = "no"
} else if resolved == `` {
resolved = "yes"
} else {
resolved = resolvedValue
}

changefeedEventDetails := eventpb.CommonChangefeedEventDetails{
Description: description,
SinkType: sinkType,
NumTables: int32(len(AllTargets(details))),
Resolved: resolved,
Format: opts[changefeedbase.OptFormat],
InitialScan: initialScan,
}

return changefeedEventDetails
}

func failureTypeForStartupError(err error) changefeedbase.FailureType {
if errors.Is(err, context.Canceled) { // Occurs for sinkless changefeeds
return changefeedbase.ConnectionClosed
} else if isTagged, tag := changefeedbase.IsTaggedError(err); isTagged {
return tag
}
return changefeedbase.OnStartup
}
127 changes: 127 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -6028,3 +6029,129 @@ func TestChangefeedMultiPodTenantPlanning(t *testing.T) {

require.Equal(t, 2, aggregatorCount)
}

func TestChangefeedCreateTelemetryLogs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, stopServer := startTestServer(t, newTestOptions())
defer stopServer()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (0, 'initial')`)

t.Run(`core_sink_type`, func(t *testing.T) {
coreSink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()
coreFeedFactory := makeSinklessFeedFactory(s, coreSink)

beforeCreateSinkless := timeutil.Now()
coreFeed := feed(t, coreFeedFactory, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, coreFeed)

createLogs := checkCreateChangefeedLogs(t, beforeCreateSinkless.UnixNano())
require.Equal(t, 1, len(createLogs))
require.Equal(t, createLogs[0].SinkType, "core")
})

t.Run(`gcpubsub_sink_type with options`, func(t *testing.T) {
pubsubFeedFactory := makePubsubFeedFactory(s, db)
beforeCreatePubsub := timeutil.Now()
pubsubFeed := feed(t, pubsubFeedFactory, `CREATE CHANGEFEED FOR foo, bar WITH resolved, no_initial_scan`)
defer closeFeed(t, pubsubFeed)

createLogs := checkCreateChangefeedLogs(t, beforeCreatePubsub.UnixNano())
require.Equal(t, 1, len(createLogs))
require.Equal(t, createLogs[0].SinkType, `gcpubsub`)
require.Equal(t, createLogs[0].NumTables, int32(2))
require.Equal(t, createLogs[0].Resolved, `yes`)
require.Equal(t, createLogs[0].InitialScan, `no`)
})
}

// Note that closeFeed needs to be called in order for the logs to be detected
func TestChangefeedFailedTelemetryLogs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

waitForLogs := func(t *testing.T, startTime time.Time) []eventpb.ChangefeedFailed {
var logs []eventpb.ChangefeedFailed
testutils.SucceedsSoon(t, func() error {
logs = checkChangefeedFailedLogs(t, startTime.UnixNano())
if len(logs) < 1 {
return fmt.Errorf("no logs found")
}
return nil
})
return logs
}

t.Run(`connection_closed`, func(t *testing.T) {
// Race condition every ~3000 runs around the error not being seen as a context cancellation
skip.UnderStress(t)

s, db, stopServer := startTestServer(t, newTestOptions())
defer stopServer()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)

coreSink, coreSinkCleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
coreFactory := makeSinklessFeedFactory(s, coreSink)
coreFeed := feed(t, coreFactory, `CREATE CHANGEFEED FOR foo`)
assertPayloads(t, coreFeed, []string{
`foo: [0]->{"after": {"a": 0, "b": "updated"}}`,
})
beforeCoreSinkClose := timeutil.Now()

coreSinkCleanup()
closeFeed(t, coreFeed)

failLogs := waitForLogs(t, beforeCoreSinkClose)
require.Equal(t, 1, len(failLogs))
require.Equal(t, failLogs[0].FailureType, changefeedbase.ConnectionClosed)
})

t.Run(`user_input`, enterpriseTest(func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

beforeCreate := timeutil.Now()
_, err := f.Feed(`CREATE CHANGEFEED FOR foo, invalid_table`)
require.Error(t, err)

failLogs := waitForLogs(t, beforeCreate)
require.Equal(t, 1, len(failLogs))
require.Equal(t, failLogs[0].FailureType, changefeedbase.UserInput)
}))

t.Run(`unknown_error`, pubsubTest(func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail")
}

beforeCreate := timeutil.Now()
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error=FAIL`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'next')`)
feedJob := foo.(cdctest.EnterpriseTestFeed)
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))

closeFeed(t, foo)
failLogs := waitForLogs(t, beforeCreate)
require.Equal(t, 1, len(failLogs))
require.Equal(t, failLogs[0].FailureType, changefeedbase.UnknownError)
require.Equal(t, failLogs[0].SinkType, `gcpubsub`)
require.Equal(t, failLogs[0].NumTables, int32(1))
}))
}

0 comments on commit 2732c69

Please sign in to comment.