Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107589: sql: improve query plan for find-running-job-of-type query r=mgartner a=mgartner

This commit improves the query plan for the `find-running-jobs-of-type`
query by adding a hint to use the `jobs_status_created_idx` and by
removing an `ORDER BY` clause if a job ID to ignore was not given. This
can eliminate an index join from the query plan in some cases, making
the query plan more efficient.

Informs #107405

Release note: None


111905: streamingccl: physical replication stream from given timestamp  r=adityamaru a=stevendanna

This adds a RESUME TIMESTAMP option to CREATE VIRTUAL CLUSTER FROM
REPLICATION.  When provided, we allow the user to start a replication
stream into an _existing_ virtual cluster.

When the resume timestamp is provided, the replication stream will be
started from that timestamp, with no initial scan. To facilitate this,
we add a new argument to crdb_internal.start_replication_stream that
allows us to pass a destination-choosen start timestamp.

To avoid various catastrophic mistakes, we only allow this when:

- The destination tenant must be in service mode None

- The provided resume timestamp equals the last recorded "revert
  timestamp" of the destination tenant. The revert timestamp is set when
  the tenant has been forcibly reverted to a particular timestamp and is
  cleared when the tenant is modified in a way that may invalidate a
  resumption from that timestamp.

- If the source tenant has a PreviousSourceTenant set, the new
  destination must match that previous source tenant field.

WARNING: Using this correctly requires that the stream is resumed
before garbage collection has progressed past the given resume
timestamp. During normal operation, the replication stream maintains a
protected timestamp to ensure this is the case. However, when
resuming using this new feature, we have no such guarantee.

Epic: none

Release note: None

Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
3 people committed Oct 9, 2023
3 parents 7803211 + 4945cf0 + 2e0f18b commit 6de91e3
Show file tree
Hide file tree
Showing 34 changed files with 764 additions and 168 deletions.
36 changes: 18 additions & 18 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6917,7 +6917,7 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`)
Expand All @@ -6928,13 +6928,13 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
{
`10`, `true`, `cluster-10`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`,
},
},
)
Expand Down Expand Up @@ -6969,13 +6969,13 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
{
`10`, `false`, `NULL`,
strconv.Itoa(int(mtinfopb.DataStateDrop)),
strconv.Itoa(int(mtinfopb.ServiceModeNone)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedDataState": "DROP", "deprecatedId": "10", "droppedName": "cluster-10"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedDataState": "DROP", "deprecatedId": "10", "droppedName": "cluster-10", "lastRevertTenantTimestamp": {}}`,
},
},
)
Expand Down Expand Up @@ -7004,13 +7004,13 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
{
`10`, `true`, `cluster-10`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`,
},
},
)
Expand Down Expand Up @@ -7045,7 +7045,7 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`)
Expand All @@ -7056,13 +7056,13 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
{
`10`, `true`, `cluster-10`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`,
},
},
)
Expand All @@ -7085,7 +7085,7 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/clusterwide'`)
Expand All @@ -7096,13 +7096,13 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
{
`10`, `true`, `cluster-10`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`,
},
},
)
Expand Down Expand Up @@ -7147,7 +7147,7 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
})
restoreDB.Exec(t, `RESTORE FROM 'nodelocal://1/clusterwide' WITH include_all_virtual_clusters`)
Expand All @@ -7158,25 +7158,25 @@ func TestBackupRestoreTenant(t *testing.T) {
`1`, `true`, `system`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeShared)),
`{"capabilities": {}, "deprecatedId": "1"}`,
`{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`,
},
{
`10`, `true`, `cluster-10`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`,
},
{
`11`, `true`, `cluster-11`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "11"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "11", "lastRevertTenantTimestamp": {}}`,
},
{
`20`, `true`, `cluster-20`,
strconv.Itoa(int(mtinfopb.DataStateReady)),
strconv.Itoa(int(mtinfopb.ServiceModeExternal)),
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "20"}`,
`{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "20", "lastRevertTenantTimestamp": {}}`,
},
},
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func streamPartition(ctx context.Context, streamAddr *url.URL) error {
return err
}

replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant))
replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant), streampb.ReplicationProducerRequest{})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)
Create(ctx context.Context, tenant roachpb.TenantName, req streampb.ReplicationProducerRequest) (streampb.ReplicationProducerSpec, error)

// Destroy informs the source of the stream that it may terminate production
// and release resources such as protected timestamps.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (sc testStreamClient) Dial(_ context.Context) error {

// Create implements the Client interface.
func (sc testStreamClient) Create(
_ context.Context, _ roachpb.TenantName,
_ context.Context, _ roachpb.TenantName, _ streampb.ReplicationProducerRequest,
) (streampb.ReplicationProducerSpec, error) {
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
Expand Down Expand Up @@ -242,7 +242,7 @@ func ExampleClient() {
_ = client.Close(ctx)
}()

prs, err := client.Create(ctx, "system")
prs, err := client.Create(ctx, "system", streampb.ReplicationProducerRequest{})
if err != nil {
panic(err)
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,25 @@ var _ Client = &partitionedStreamClient{}

// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
ctx context.Context, tenantName roachpb.TenantName, req streampb.ReplicationProducerRequest,
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()
p.mu.Lock()
defer p.mu.Unlock()

var row pgx.Row
if !req.ReplicationStartTime.IsEmpty() {
reqBytes, err := protoutil.Marshal(&req)
if err != nil {
return streampb.ReplicationProducerSpec{}, err
}
row = p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1, $2)`, tenantName, reqBytes)
} else {
row = p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName)
}

var rawReplicationProducerSpec []byte
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName)
err := row.Scan(&rawReplicationProducerSpec)
if err != nil {
return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestPartitionStreamReplicationClientWithNonRunningJobs(t *testing.T) {
})
})
t.Run("paused-job", func(t *testing.T) {
rps, err := client.Create(ctx, testTenantName)
rps, err := client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)
targetStreamID := rps.StreamID
h.SysSQL.Exec(t, `PAUSE JOB $1`, targetStreamID)
Expand All @@ -173,7 +173,7 @@ func TestPartitionStreamReplicationClientWithNonRunningJobs(t *testing.T) {
})
})
t.Run("cancelled-job", func(t *testing.T) {
rps, err := client.Create(ctx, testTenantName)
rps, err := client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)
targetStreamID := rps.StreamID
h.SysSQL.Exec(t, `CANCEL JOB $1`, targetStreamID)
Expand Down Expand Up @@ -249,11 +249,11 @@ INSERT INTO d.t2 VALUES (2);
[][]string{{string(status)}})
}

rps, err := client.Create(ctx, testTenantName)
rps, err := client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)
streamID := rps.StreamID
// We can create multiple replication streams for the same tenant.
_, err = client.Create(ctx, testTenantName)
_, err = client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)

expectStreamState(streamID, jobs.StatusRunning)
Expand Down Expand Up @@ -348,7 +348,7 @@ INSERT INTO d.t2 VALUES (2);
h.SysSQL.Exec(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms';
`)
rps, err = client.Create(ctx, testTenantName)
rps, err = client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{})
require.NoError(t, err)
streamID = rps.StreamID
require.NoError(t, client.Complete(ctx, streamID, true))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top

// Create implements the Client interface.
func (m *RandomStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
ctx context.Context, tenantName roachpb.TenantName, _ streampb.ReplicationProducerRequest,
) (streampb.ReplicationProducerSpec, error) {
log.Infof(ctx, "creating random stream for tenant %s", tenantName)
return streampb.ReplicationProducerSpec{
Expand Down

0 comments on commit 6de91e3

Please sign in to comment.