-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
c2c: plumb forSpanConfigs arg through streamclient.Create() #106792
c2c: plumb forSpanConfigs arg through streamclient.Create() #106792
Conversation
2843d22
to
78100c7
Compare
This patch fixes a papercut where we were storing pointers to spans in the producer job proto. This isn't a correctness bug, as protocol buffers will dereference pointers to objects during unmarshalling, but rather, it seems dangerous to work with slices of pointers to spans. Release note: None
78100c7
to
f346995
Compare
@@ -249,7 +249,7 @@ INSERT INTO d.t2 VALUES (2); | |||
h.SysSQL.Exec(t, ` | |||
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'; | |||
`) | |||
rps, err = client.Create(ctx, testTenantName) | |||
rps, err = client.Create(ctx, testTenantName, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test with forSpanConfigs=true in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to in a follow up PR, once I actually teach the partitioned_stream_client to process span config updates.
) (streampb.ReplicationProducerSpec, error) { | ||
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create") | ||
defer sp.Finish() | ||
|
||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
var rawReplicationProducerSpec []byte | ||
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName) | ||
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1,$2)`, | ||
tenantName, forSpanConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it allowed to use forSpanConfig=true with a tenantName other than system?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. this query now fails if spanConfig==true and the tenanName is not system
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, I thought so, in that case, what do you think about splitting the API to have 2 funcs, one for span configs and a "normal" one? so that you would not need the boolean. something like:
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)
CreateForSpanConfigs(ctx context.Context) (streampb.ReplicationProducerSpec, error)
I don't think this is needed all over.. you'll decide where it makes more sense. overall I think it would be nicer than a bool, and callers will not be able to mix span configs and non-system tenant. another benefit might be that if we'll have crdb_internal.start_replication_stream_for_span_configs
we'll get separate metrics for the 2 flavors (once we have metrics..).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We chatted offline and decided that it's fine to keep the API as is-- there's only one caller of this interface, the stream ingestion job, and it will only ever call Create()
with and without the bool. If we need to add another stream flavor, it would be nicer to add some sort of opts struct to Create()
. @stevendanna when you get a chance, please do chime in here if you have opinions on how I'm changing the api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lidorcarmel after thinking about this a bit more, I realized we need to persist the app tenant ID asap on the producer side. I think I need to modify this PR to do the following:
- add a new field to the
SpanConfigsForTenantID
to the producer job record which persists the replicating app tenant id whose span config updates we want. TheTenantID
field will continue to be the system tenant id. - modify the
startReplicationProducerJob()
, to persist theSpanConfigsForTenantID
field if necessary.
From the client's perspective, we can either add:
CreateForSpanConfigs(ctx context.Context, appTenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)
or modify the current impl in the PR to:
Create(ctx context.Context, appTenant roachpb.TenantName, forSpanConfigs bool) (streampb.ReplicationProducerSpec, error)
i.e. the client always passes in the appTenant name, never a system tenant.
We can discuss this on Monday.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lidorcarmel thanks for the quick review!
) (streampb.ReplicationProducerSpec, error) { | ||
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create") | ||
defer sp.Finish() | ||
|
||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
var rawReplicationProducerSpec []byte | ||
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName) | ||
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1,$2)`, | ||
tenantName, forSpanConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. this query now fails if spanConfig==true and the tenanName is not system
@@ -249,7 +249,7 @@ INSERT INTO d.t2 VALUES (2); | |||
h.SysSQL.Exec(t, ` | |||
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'; | |||
`) | |||
rps, err = client.Create(ctx, testTenantName) | |||
rps, err = client.Create(ctx, testTenantName, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to in a follow up PR, once I actually teach the partitioned_stream_client to process span config updates.
73ee93a
to
298eca7
Compare
298eca7
to
e285254
Compare
This patch plumbs the forSpanConfigs arg through the streamclient.Create() interface and modifies producer job creation to return a specification for streaming the spanConfiguration table if that arg is passed. A future PR will plumb this info through the StreamPartition spec. It's worth noting that we don't necessarily need this plumbing: if we see that tenant Id is for the system tenant, then we could automatically set up a span config stream. I think this extra plumbing is worth it because it makes everything a bit easier to read. Informs cockroachdb#106823 Release note: None
e285254
to
94ac2eb
Compare
unrelated essential ci flake |
closing in favor of #107077 |
unrelated flakes |
This patch plumbs the forSpanConfigs arg through the streamclient.Create()
interface and modifies producer job creation to return a specification for
streaming the spanConfiguration table if that arg is passed.
A future PR will plumb this info through the StreamPartition spec.
It's worth noting that we don't necessarily need this plumbing: if we see that
tenant Id is for the system tenant, then we could automatically set up a span
config stream. I think this extra plumbing is worth it because it makes
everything a bit easier to read.
UPDATE: I think there is a better API which I describe here
Informs #106823
Release note: None