Skip to content

Commit

Permalink
c2c: add new streaming client and value generator for span config rep…
Browse files Browse the repository at this point in the history
…lication

This patch adds a new implementation of the streamclient.Client interface which
replicates span config updates for the replicating app tenant. With this
change, a call to spanConfigClient.SetupSpanConfigsStream() will return a new
spanConfigSubscription which the client can use to listen for span config
updates.

Internally, this subscription spins up the new spanConfigEventStream value
generator which listens to updates on the system span_configuration table using
a rangefeedcache. Unlike the original eventStream which manages a rangefeed,
the spanConfigEventStream uses a rangefeedcache wrapper as it provides nicer
ordering guarantees on when it emits updates. Specifically, the rangefeed cache
buffers updates emitted from the underlying rangefeed and only flushes updates
less than t1 once the whole span has been checkpointed to t1. In other words,
the rangefeed cache gaurantees that once it flushes updates less than t1, it
will not flush any more _new_ updates less than t1. The rangefeed cache also
flushes updates in sorted timestamp order.

These properties will simplify the ingestion of these span config updates
signficantly. To understand why, consider that span config updates _must_ be
ingested in the same order that they were applied. Since the rangefeed cache
will emit all new updates in order, the ingestion side doesn't need to buffer
updates or keep track of checkpoints.

Informs #106823

Release note: None
  • Loading branch information
msbutler committed Aug 8, 2023
1 parent c948904 commit 441118e
Show file tree
Hide file tree
Showing 26 changed files with 1,012 additions and 352 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ go_library(
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/util/hlc",
],
)
69 changes: 69 additions & 0 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ package streamingccl
import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// EventType enumerates all possible events emitted over a cluster stream.
Expand All @@ -30,6 +32,9 @@ const (
// CheckpointEvent indicates that GetResolvedSpans will be meaningful. The resolved
// timestamp indicates that all KVs have been emitted up to this timestamp.
CheckpointEvent
// SpanConfigEvent indicates that the SpanConfig field of an enven holds an updated
// SpanConfigRecord.
SpanConfigEvent
)

// Event describes an event emitted by a cluster to cluster stream. Its Type
Expand All @@ -52,6 +57,9 @@ type Event interface {
// GetResolvedSpans returns a list of span-time pairs indicating the time for
// which all KV events within that span has been emitted.
GetResolvedSpans() []jobspb.ResolvedSpan

// GetSpanConfigEvent returns a SpanConfig event if the EventType is SpanConfigEvent
GetSpanConfigEvent() *roachpb.SpanConfigEntry
}

// kvEvent is a key value pair that needs to be ingested.
Expand Down Expand Up @@ -86,6 +94,11 @@ func (kve kvEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (kve kvEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

// sstableEvent is a sstable that needs to be ingested.
type sstableEvent struct {
sst kvpb.RangeFeedSSTable
Expand Down Expand Up @@ -116,6 +129,11 @@ func (sste sstableEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (sste sstableEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

var _ Event = sstableEvent{}

// delRangeEvent is a DeleteRange event that needs to be ingested.
Expand Down Expand Up @@ -148,6 +166,11 @@ func (dre delRangeEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (dre delRangeEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

var _ Event = delRangeEvent{}

// checkpointEvent indicates that the stream has emitted every change for all
Expand Down Expand Up @@ -183,6 +206,48 @@ func (ce checkpointEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return ce.resolvedSpans
}

// GetSpanConfigEvent implements the Event interface.
func (ce checkpointEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

type spanConfigEvent struct {
spanConfigEvent roachpb.SpanConfigEntry
timestamp hlc.Timestamp
}

var _ Event = spanConfigEvent{}

// Type implements the Event interface.
func (spe spanConfigEvent) Type() EventType {
return SpanConfigEvent
}

// GetKV implements the Event interface.
func (spe spanConfigEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (spe spanConfigEvent) GetSSTable() *kvpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (spe spanConfigEvent) GetDeleteRange() *kvpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (spe spanConfigEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (spe spanConfigEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return &spe.spanConfigEvent
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
Expand All @@ -202,3 +267,7 @@ func MakeDeleteRangeEvent(delRange kvpb.RangeFeedDeleteRange) Event {
func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event {
return checkpointEvent{resolvedSpans: resolvedSpans}
}

func MakeSpanConfigEvent(streamedSpanConfig streampb.StreamEvent_StreamedSpanConfigEntry) Event {
return spanConfigEvent{spanConfigEvent: streamedSpanConfig.SpanConfig, timestamp: streamedSpanConfig.Timestamp}
}
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/jobs",
Expand Down
12 changes: 0 additions & 12 deletions pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,6 @@ func (rh *ReplicationHelper) StartReplicationStream(
return replicationProducerSpec
}

func (rh *ReplicationHelper) SetupSpanConfigsReplicationStream(
t *testing.T, sourceTenantName roachpb.TenantName,
) streampb.ReplicationStreamSpec {
var rawSpec []byte
row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.setup_span_configs_stream($1)`, sourceTenantName)
row.Scan(&rawSpec)
var spec streampb.ReplicationStreamSpec
err := protoutil.Unmarshal(rawSpec, &spec)
require.NoError(t, err)
return spec
}

func (rh *ReplicationHelper) MaybeGenerateInlineURL(t *testing.T) *url.URL {
if rh.rng.Float64() > 0.5 {
return &rh.PGUrl
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go_library(
name = "streamclient",
srcs = [
"client.go",
"client_helpers.go",
"partitioned_stream_client.go",
"random_stream_client.go",
"span_config_stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Client interface {
// that apply to the passed in tenant, and returns the subscriptions the
// client can subscribe to. No protected timestamp or job is persisted to the
// source cluster.
SetupSpanConfigsStream(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, Topology, error)
SetupSpanConfigsStream(ctx context.Context, tenant roachpb.TenantName) (Subscription, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand Down Expand Up @@ -148,7 +148,7 @@ type Subscription interface {

// NewStreamClient creates a new stream client based on the stream address.
func NewStreamClient(
ctx context.Context, streamAddress streamingccl.StreamAddress, db isql.DB,
ctx context.Context, streamAddress streamingccl.StreamAddress, db isql.DB, forSpanConfigs bool,
) (Client, error) {
var streamClient Client
streamURL, err := streamAddress.URL()
Expand All @@ -160,6 +160,9 @@ func NewStreamClient(
case "postgres", "postgresql":
// The canonical PostgreSQL URL scheme is "postgresql", however our
// own client commands also accept "postgres".
if forSpanConfigs {
return NewSpanConfigStreamClient(streamURL)
}
return NewPartitionedStreamClient(ctx, streamURL)
case "external":
if db == nil {
Expand All @@ -169,7 +172,7 @@ func NewStreamClient(
if err != nil {
return nil, err
}
return NewStreamClient(ctx, addr, db)
return NewStreamClient(ctx, addr, db, forSpanConfigs)
case RandomGenScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
Expand Down Expand Up @@ -206,7 +209,7 @@ func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client
var combinedError error = nil
for _, address := range streamAddresses {
streamAddress := streamingccl.StreamAddress(address)
client, err := NewStreamClient(ctx, streamAddress, nil)
client, err := NewStreamClient(ctx, streamAddress, nil, false)
if err == nil {
err = client.Dial(ctx)
if err == nil {
Expand Down
Loading

0 comments on commit 441118e

Please sign in to comment.