Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125369: streamproducer: add emit_wait and produce_wait to vtable r=dt a=dt

Release note: none.
Epic: none.

125411: streamingccl: cleanup unit test terminology r=stevendanna a=stevendanna

Mixing source/target with serverA/serverB was a bit confusing.

Epic: none
Release note: None

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
3 people committed Jun 10, 2024
3 parents 591eec9 + 6fbb40c + 4b0f3d8 commit c761d4d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 32 deletions.
54 changes: 27 additions & 27 deletions pkg/ccl/streamingccl/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
serverB := testcluster.StartTestCluster(t, 1, clusterArgs)
defer serverB.Stopper().Stop(ctx)

source := sqlutils.MakeSQLRunner(serverA.Server(0).ApplicationLayer().SQLConn(t))
target := sqlutils.MakeSQLRunner(serverB.Server(0).ApplicationLayer().SQLConn(t))
serverASQL := sqlutils.MakeSQLRunner(serverA.Server(0).ApplicationLayer().SQLConn(t))
serverBSQL := sqlutils.MakeSQLRunner(serverB.Server(0).ApplicationLayer().SQLConn(t))

for _, s := range []string{
"SET CLUSTER SETTING kv.rangefeed.enabled = true",
Expand All @@ -65,17 +65,17 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
"SET CLUSTER SETTING logical_replication.consumer.minimum_flush_interval = '10ms'",
"SET CLUSTER SETTING logical_replication.consumer.timestamp_granularity = '100ms'",
} {
source.Exec(t, s)
target.Exec(t, s)
serverASQL.Exec(t, s)
serverBSQL.Exec(t, s)
}

source.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
target.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
source.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
target.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
serverASQL.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
serverBSQL.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
serverASQL.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
serverBSQL.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")

source.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
target.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")
serverASQL.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
serverBSQL.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")

serverAURL, cleanup := sqlutils.PGUrl(t, serverA.Server(0).ApplicationLayer().SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
Expand All @@ -86,31 +86,31 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
jobAID jobspb.JobID
jobBID jobspb.JobID
)
target.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverAURL.String(), `ARRAY['tab']`)).Scan(&jobAID)
source.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverBURL.String(), `ARRAY['tab']`)).Scan(&jobBID)
serverASQL.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverBURL.String(), `ARRAY['tab']`)).Scan(&jobAID)
serverBSQL.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverAURL.String(), `ARRAY['tab']`)).Scan(&jobBID)

now := serverA.Server(0).Clock().Now()
t.Logf("waiting for replication job %d", jobAID)
WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), target, jobAID)
WaitUntilReplicatedTime(t, now, serverASQL, jobAID)
t.Logf("waiting for replication job %d", jobBID)
WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), source, jobBID)
WaitUntilReplicatedTime(t, now, serverBSQL, jobBID)

source.Exec(t, "INSERT INTO tab VALUES (2, 'potato')")
target.Exec(t, "INSERT INTO tab VALUES (3, 'celeriac')")
source.Exec(t, "UPSERT INTO tab VALUES (1, 'hello, again')")
target.Exec(t, "UPSERT INTO tab VALUES (1, 'goodbye, again')")
serverASQL.Exec(t, "INSERT INTO tab VALUES (2, 'potato')")
serverBSQL.Exec(t, "INSERT INTO tab VALUES (3, 'celeriac')")
serverASQL.Exec(t, "UPSERT INTO tab VALUES (1, 'hello, again')")
serverBSQL.Exec(t, "UPSERT INTO tab VALUES (1, 'goodbye, again')")

WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), target, jobAID)
WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), source, jobBID)
now = serverA.Server(0).Clock().Now()
WaitUntilReplicatedTime(t, now, serverASQL, jobAID)
WaitUntilReplicatedTime(t, now, serverBSQL, jobBID)

target.CheckQueryResults(t, "SELECT * from tab", [][]string{
expectedRows := [][]string{
{"1", "goodbye, again"},
{"2", "potato"},
{"3", "celeriac"},
})
source.CheckQueryResults(t, "SELECT * from tab", [][]string{
{"1", "goodbye, again"},
{"2", "potato"},
{"3", "celeriac"},
})
}
serverBSQL.CheckQueryResults(t, "SELECT * from tab", expectedRows)
serverASQL.CheckQueryResults(t, "SELECT * from tab", expectedRows)
}

func WaitUntilReplicatedTime(
Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type eventStream struct {
lastCheckpointTime time.Time
lastCheckpointLen int

lastPolled time.Time

debug streampb.DebugProducerStatus
}

Expand Down Expand Up @@ -111,6 +113,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
return errors.AssertionFailedf("expected to be started once")
}

s.lastPolled = timeutil.Now()

sourceTenantID, err := s.validateProducerJobAndSpec(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -215,6 +219,12 @@ func (s *eventStream) setErr(err error) bool {

// Next implements eval.ValueGenerator interface.
func (s *eventStream) Next(ctx context.Context) (bool, error) {
emitWait := int64(timeutil.Since(s.lastPolled))

s.debug.Flushes.LastEmitWaitNanos.Store(emitWait)
s.debug.Flushes.EmitWaitNanos.Add(emitWait)
s.lastPolled = timeutil.Now()

select {
case <-ctx.Done():
return false, ctx.Err()
Expand All @@ -226,6 +236,10 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) {
case err := <-s.errCh:
return false, err
default:
produceWait := int64(timeutil.Since(s.lastPolled))
s.debug.Flushes.ProduceWaitNanos.Add(produceWait)
s.debug.Flushes.LastProduceWaitNanos.Store(produceWait)
s.lastPolled = timeutil.Now()
return true, nil
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ var zipInternalTablesPerCluster = DebugZipTableRegistry{
"batches",
"checkpoints",
"megabytes",
"produce_wait",
"emit_wait",
"last_produce_wait",
"last_emit_wait",
"last_checkpoint",
"rf_checkpoints",
"rf_advances",
Expand Down
3 changes: 2 additions & 1 deletion pkg/repstream/streampb/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type DebugProducerStatus struct {
ResolvedMicros atomic.Int64
}
Flushes struct {
Batches, Checkpoints, Bytes atomic.Int64
Batches, Checkpoints, Bytes, EmitWaitNanos, ProduceWaitNanos atomic.Int64
LastProduceWaitNanos, LastEmitWaitNanos atomic.Int64
}
LastCheckpoint struct {
Micros atomic.Int64
Expand Down
41 changes: 38 additions & 3 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand Down Expand Up @@ -9028,6 +9029,11 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
megabytes FLOAT,
last_checkpoint INTERVAL,
produce_wait INTERVAL,
emit_wait INTERVAL,
last_produce_wait INTERVAL,
last_emit_wait INTERVAL,
rf_checkpoints INT,
rf_advances INT,
rf_last_advance INTERVAL,
Expand All @@ -9052,25 +9058,54 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
return tree.NewDInterval(duration.Age(now, t), types.DefaultIntervalTypeMetadata)
}

// Transform `.0000000000` to `.0` to shorted/de-noise HLCs.
shortenLogical := func(d *tree.DDecimal) *tree.DDecimal {
var tmp apd.Decimal
d.Modf(nil, &tmp)
if tmp.IsZero() {
if _, err := tree.DecimalCtx.Quantize(&tmp, &d.Decimal, -1); err == nil {
d.Decimal = tmp
}
}
return d
}

for _, s := range sm.DebugGetProducerStatuses(ctx) {
resolved := time.UnixMicro(s.RF.ResolvedMicros.Load())
resolvedDatum := tree.DNull
if resolved.Unix() != 0 {
resolvedDatum = eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()})
resolvedDatum = shortenLogical(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()}))
}

if err := addRow(
tree.NewDInt(tree.DInt(s.StreamID)),
tree.NewDString(fmt.Sprintf("%d[%d]", s.Spec.ConsumerNode, s.Spec.ConsumerProc)),
tree.NewDInt(tree.DInt(len(s.Spec.Spans))),
eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp),
eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp)),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp)),

tree.NewDInt(tree.DInt(s.Flushes.Batches.Load())),
tree.NewDInt(tree.DInt(s.Flushes.Checkpoints.Load())),
tree.NewDFloat(tree.DFloat(math.Round(float64(s.Flushes.Bytes.Load())/float64(1<<18))/4)),
age(time.UnixMicro(s.LastCheckpoint.Micros.Load())),

tree.NewDInterval(
duration.MakeDuration(s.Flushes.ProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.EmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastEmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),

tree.NewDInt(tree.DInt(s.RF.Checkpoints.Load())),
tree.NewDInt(tree.DInt(s.RF.Advances.Load())),
age(time.UnixMicro(s.RF.LastAdvanceMicros.Load())),
Expand Down
Loading

0 comments on commit c761d4d

Please sign in to comment.