Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117697: util/log/eventpb: remove ConvertToSchema r=petermattis a=petermattis

Remove the ConvertToSchema event which has not been emitted since #92471
(Nov, 2022).

Release note: none


117776: streamingccl: add EXPIRATION WINDOW option to ALTER VIRTUAL CLUSTER  r=stevendanna a=msbutler

Previously, the private `stream_replication.job_liveness_timeout` cluster
setting determined how long the producer job would stay alive without a
hearbeat from the consumer job. This patch removes this setting, and adds new
sql syntax so the user can set this timeout on for all producer jobs on the
given tenant.

Epic: [CRDB-34233](https://cockroachlabs.atlassian.net/browse/CRDB-34233)

Release note (sql change): This patch adds the new EXPIRATION WINDOW option set
through `ALTER TENANT appTenant SET REPLICATION EXPIRATION WINDOW ='100ms'`,
which allows the user to override the default producer job expiration window of
24 hours. The producer job expiration window determines how long the producer
job stays alive without a heartbeat from the consumer job.

Co-authored-by: Peter Mattis <petermattis@gmail.com>
Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
3 people committed Jan 16, 2024
3 parents 0e4adb8 + bb824ec + de0265a commit 515663b
Show file tree
Hide file tree
Showing 23 changed files with 290 additions and 342 deletions.
24 changes: 0 additions & 24 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -996,30 +996,6 @@ An event of type `comment_on_table` is recorded when a table is commented.
| `NullComment` | Set to true if the comment was removed entirely. | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `Statement` | A normalized copy of the SQL statement that triggered the event. The statement string contains a mix of sensitive and non-sensitive details (it is redactable). | partially |
| `Tag` | The statement tag. This is separate from the statement string, since the statement string can contain sensitive information. The tag is guaranteed not to. | no |
| `User` | The user account that triggered the event. The special usernames `root` and `node` are not considered sensitive. | depends |
| `DescriptorID` | The primary object descriptor affected by the operation. Set to zero for operations that don't affect descriptors. | no |
| `ApplicationName` | The application name for the session where the event was emitted. This is included in the event to ease filtering of logging output by application. | no |
| `PlaceholderValues` | The mapping of SQL placeholders to their values, for prepared statements. | yes |

### `convert_to_schema`

An event of type `convert_to_schema` is recorded when a database is converted to a schema.


| Field | Description | Sensitive |
|--|--|--|
| `DatabaseName` | The name of the database being converted to a schema. | yes |
| `NewDatabaseParent` | The name of the parent database for the new schema. | yes |


#### Common fields

| Field | Description | Sensitive |
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *TenantStreamingClusters) init(ctx context.Context) {
if c.Args.DestInitFunc != nil {
c.Args.DestInitFunc(c.T, c.DestSysSQL)
}
// Enable stream replication on dest by default.
c.SrcSysSQL.Exec(c.T, `SET CLUSTER SETTING physical_replication.enabled = true;`)
c.DestSysSQL.Exec(c.T, `SET CLUSTER SETTING physical_replication.enabled = true;`)
}

Expand Down Expand Up @@ -541,7 +541,6 @@ var defaultSrcClusterSetting = map[string]string{
`kv.rangefeed.closed_timestamp_refresh_interval`: `'200ms'`,
`kv.closed_timestamp.side_transport_interval`: `'50ms'`,
// Large timeout makes test to not fail with unexpected timeout failures.
`stream_replication.job_liveness.timeout`: `'3m'`,
`stream_replication.stream_liveness_track_frequency`: `'2s'`,
`stream_replication.min_checkpoint_frequency`: `'1s'`,
// Make all AddSSTable operation to trigger AddSSTable events.
Expand Down
11 changes: 0 additions & 11 deletions pkg/ccl/streamingccl/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ var StreamReplicationStreamLivenessTrackFrequency = settings.RegisterDurationSet
settings.WithName("physical_replication.producer.stream_liveness_track_frequency"),
)

// StreamReplicationJobLivenessTimeout controls how long we wait for to kill
// an inactive producer job.
var StreamReplicationJobLivenessTimeout = settings.RegisterDurationSetting(
settings.SystemOnly,
"stream_replication.job_liveness_timeout",
"controls how long we wait for to kill an inactive producer job",
3*24*time.Hour,
settings.WithRetiredName("stream_replication.job_liveness.timeout"),
settings.WithName("physical_replication.producer.job_liveness.timeout"),
)

// StreamReplicationMinCheckpointFrequency controls the minimum frequency the stream replication
// source cluster sends checkpoints to destination cluster.
var StreamReplicationMinCheckpointFrequency = settings.RegisterDurationSetting(
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_test(
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -225,9 +227,7 @@ func TestPartitionedStreamReplicationClient(t *testing.T) {

ctx := context.Background()
// Makes sure source cluster producer job does not time out within test timeout
h.SysSQL.Exec(t, `
SET CLUSTER SETTING stream_replication.job_liveness.timeout = '500s';
`)

tenant.SQL.Exec(t, `
CREATE DATABASE d;
CREATE TABLE d.t1(i int primary key, a string, b string);
Expand All @@ -253,7 +253,6 @@ INSERT INTO d.t2 VALUES (2);
// We can create multiple replication streams for the same tenant.
_, err = client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)

expectStreamState(streamID, jobs.StatusRunning)

top, err := client.Plan(ctx, streamID)
Expand Down Expand Up @@ -343,15 +342,15 @@ INSERT INTO d.t2 VALUES (2);
require.True(t, testutils.IsError(err, "job with ID 999 does not exist"), err)

// Makes producer job exit quickly.
h.SysSQL.ExecMultiple(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'`,
`SET CLUSTER SETTING stream_replication.job_liveness_timeout = '1s'`)
h.SysSQL.Exec(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'`)
rps, err = client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)
streamID = rps.StreamID
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
require.NoError(t, client.Complete(ctx, streamID, true))
h.SysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
h.SysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='100ms'`, testTenantName))
jobutils.WaitForJobToSucceed(t, h.SysSQL, jobspb.JobID(streamID))
}

// isQueryCanceledError returns true if the error appears to be a query cancelled error.
Expand Down
112 changes: 98 additions & 14 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
Expand Down Expand Up @@ -47,10 +48,11 @@ var alterReplicationCutoverHeader = colinfo.ResultColumns{
}

// ResolvedTenantReplicationOptions represents options from an
// evaluated CREATE VIRTUAL CLUSTER FROM REPLICATION command.
// evaluated CREATE/ALTER VIRTUAL CLUSTER FROM REPLICATION command.
type resolvedTenantReplicationOptions struct {
resumeTimestamp hlc.Timestamp
retention *int32
resumeTimestamp hlc.Timestamp
retention *int32
expirationWindow *time.Duration
}

func evalTenantReplicationOptions(
Expand Down Expand Up @@ -78,7 +80,14 @@ func evalTenantReplicationOptions(
retSeconds := int32(retSeconds64)
r.retention = &retSeconds
}

if options.ExpirationWindow != nil {
dur, err := eval.Duration(ctx, options.ExpirationWindow)
if err != nil {
return nil, err
}
expirationWindow := time.Duration(dur.Nanos())
r.expirationWindow = &expirationWindow
}
return r, nil
}

Expand All @@ -89,6 +98,17 @@ func (r *resolvedTenantReplicationOptions) GetRetention() (int32, bool) {
return *r.retention, true
}

func (r *resolvedTenantReplicationOptions) GetExpirationWindow() (time.Duration, bool) {
if r == nil || r.expirationWindow == nil {
return 0, false
}
return *r.expirationWindow, true
}

func (r *resolvedTenantReplicationOptions) DestinationOptionsSet() bool {
return r != nil && (r.retention != nil || r.resumeTimestamp.IsSet())
}

func alterReplicationJobTypeCheck(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (matched bool, header colinfo.ResultColumns, _ error) {
Expand Down Expand Up @@ -219,12 +239,15 @@ func alterReplicationJobHook(
alterTenantStmt,
)
}

if tenInfo.PhysicalReplicationConsumerJobID == 0 {
return errors.Newf("tenant %q (%d) does not have an active replication job",
tenInfo.Name, tenInfo.ID)
}
jobRegistry := p.ExecCfg().JobRegistry
if !alterTenantStmt.Options.IsDefault() {
// If the statement contains options, then the user provided the ALTER
// TENANT ... SET REPLICATION [options] form of the command.
return alterTenantSetReplication(ctx, p.InternalSQLTxn(), jobRegistry, options, tenInfo)
}
if err := checkForActiveIngestionJob(tenInfo); err != nil {
return err
}
if alterTenantStmt.Cutover != nil {
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(p.InternalSQLTxn())
actualCutoverTime, err := alterTenantJobCutover(
Expand All @@ -233,10 +256,6 @@ func alterReplicationJobHook(
return err
}
resultsCh <- tree.Datums{eval.TimestampToDecimalDatum(actualCutoverTime)}
} else if !alterTenantStmt.Options.IsDefault() {
if err := alterTenantOptions(ctx, p.InternalSQLTxn(), jobRegistry, options, tenInfo); err != nil {
return err
}
} else {
switch alterTenantStmt.Command {
case tree.ResumeJob:
Expand All @@ -260,6 +279,38 @@ func alterReplicationJobHook(
return fn, nil, nil, false, nil
}

func alterTenantSetReplication(
ctx context.Context,
txn isql.Txn,
jobRegistry *jobs.Registry,
options *resolvedTenantReplicationOptions,
tenInfo *mtinfopb.TenantInfo,
) error {

if expirationWindow, ok := options.GetExpirationWindow(); ok {
if err := alterTenantExpirationWindow(ctx, txn, jobRegistry, expirationWindow, tenInfo); err != nil {
return err
}
}
if options.DestinationOptionsSet() {
if err := checkForActiveIngestionJob(tenInfo); err != nil {
return err
}
if err := alterTenantConsumerOptions(ctx, txn, jobRegistry, options, tenInfo); err != nil {
return err
}
}
return nil
}

func checkForActiveIngestionJob(tenInfo *mtinfopb.TenantInfo) error {
if tenInfo.PhysicalReplicationConsumerJobID == 0 {
return errors.Newf("tenant %q (%d) does not have an active replication consumer job",
tenInfo.Name, tenInfo.ID)
}
return nil
}

func alterTenantRestartReplication(
ctx context.Context,
p sql.PlanHookState,
Expand Down Expand Up @@ -287,6 +338,10 @@ func alterTenantRestartReplication(
)
}

if alterTenantStmt.Options.ExpirationWindowSet() {
return CannotSetExpirationWindowErr
}

streamAddress := streamingccl.StreamAddress(srcAddr)
streamURL, err := streamAddress.URL()
if err != nil {
Expand Down Expand Up @@ -436,7 +491,36 @@ func applyCutoverTime(
return job.WithTxn(txn).Unpaused(ctx)
}

func alterTenantOptions(
func alterTenantExpirationWindow(
ctx context.Context,
txn isql.Txn,
jobRegistry *jobs.Registry,
expirationWindow time.Duration,
tenInfo *mtinfopb.TenantInfo,
) error {
for _, producerJobID := range tenInfo.PhysicalReplicationProducerJobIDs {
if err := jobRegistry.UpdateJobWithTxn(ctx, producerJobID, txn,
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {

streamProducerDetails := md.Payload.GetStreamReplication()
previousExpirationWindow := streamProducerDetails.ExpirationWindow
streamProducerDetails.ExpirationWindow = expirationWindow
ju.UpdatePayload(md.Payload)

difference := expirationWindow - previousExpirationWindow
currentExpiration := md.Progress.GetStreamReplication().Expiration
newExpiration := currentExpiration.Add(difference)
md.Progress.GetStreamReplication().Expiration = newExpiration
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
}
}
return nil
}

func alterTenantConsumerOptions(
ctx context.Context,
txn isql.Txn,
jobRegistry *jobs.Registry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func TestAlterTenantPauseResume(t *testing.T) {

t.Run("pause-resume-tenant-with-no-replication", func(t *testing.T) {
c.DestSysSQL.Exec(t, `CREATE TENANT noreplication`)
c.DestSysSQL.ExpectErr(t, `tenant "noreplication" \(3\) does not have an active replication job`,
c.DestSysSQL.ExpectErr(t, `tenant "noreplication" \(3\) does not have an active replication consumer job`,
`ALTER TENANT $1 PAUSE REPLICATION`, "noreplication")
c.DestSysSQL.ExpectErr(t, `tenant "noreplication" \(3\) does not have an active replication job`,
c.DestSysSQL.ExpectErr(t, `tenant "noreplication" \(3\) does not have an active replication consumer job`,
`ALTER TENANT $1 RESUME REPLICATION`, "noreplication")
})

Expand Down
22 changes: 5 additions & 17 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.SrcClusterSettings[`stream_replication.job_liveness.timeout`] = `'1m'`
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='1m'`, c.Args.SrcTenantName))

srcTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))
Expand All @@ -70,9 +70,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
require.True(t, srcTime.LessEq(stats.ReplicationLagInfo.MinIngestedTimestamp))

// Make producer job easily times out
c.SrcSysSQL.ExecMultiple(t, replicationtestutils.ConfigureClusterSettings(map[string]string{
`stream_replication.job_liveness.timeout`: `'100ms'`,
})...)
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='100ms'`, c.Args.SrcTenantName))

jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
// The ingestion job will stop retrying as this is a permanent job error.
Expand Down Expand Up @@ -927,13 +925,6 @@ func TestProtectedTimestampManagement(t *testing.T) {
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")

if !pauseBeforeTerminal {
// Only set a short job liveness timeout if the job will not get paused.
// Else, the producer job may inadvertently timeout while the job is
// paused.
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '100ms'")
}

producerJobID, replicationJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
Expand Down Expand Up @@ -971,23 +962,20 @@ func TestProtectedTimestampManagement(t *testing.T) {
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
var emptyCutoverTime time.Time
c.Cutover(producerJobID, replicationJobID, emptyCutoverTime, false)
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='100ms'`, c.Args.SrcTenantName))
}

// Set GC TTL low, so that the GC job completes quickly in the test.
c.DestSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;")
c.DestSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.Args.DestTenantName))

if !completeReplication {
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='1ms'`, c.Args.SrcTenantName))
jobutils.WaitForJobToCancel(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
}

// Check if the producer job has released protected timestamp if the job
// completed with a low stream_replication.job_liveness_timeout, or if the
// replication stream didn't complete.
if !pauseBeforeTerminal || !completeReplication {
requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer, jobspb.JobID(producerJobID))
}
requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer, jobspb.JobID(producerJobID))

// Check if the replication job has released protected timestamp.
checkNoDestinationProtection(c, replicationJobID)
Expand Down

0 comments on commit 515663b

Please sign in to comment.