Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125162: streamingccl: send heartbeat time to source cluster r=azhu-crl a=azhu-crl

Previously, persisted replicated time is used in hearbeats
sent to the source cluster. By using the persisted replicated time,
failback timestamp may be unprotected when cutover time has been
set and is lower than the persisted replicated time. In this case,
we may fail to replicate data from cutover time since the data
to be replicated may have been garbage collected due to having
a persisted replicated timestamp higher than the cutover timestamp.

In this PR, we fix the issue by taking the minimum of persisted
replicated time and cutover time to be the heartbeat time.
Heartbeat time will be used in heartbeats sent to source
cluster instead of persisted replicated time. This change ensures
that data to be replicated are safe from garbage collection at
and above cutover time.

Informs: #117984
Epic: none

Release note: none

125357: sql,changefeedccl: miscellaneous cleanup of server shutdown r=yuzefovich a=yuzefovich

**stats: refresh stats cache entries in async tasks**

Previously, we would use "vanilla" goroutines when refreshing stats entries (triggered by the rangefeed), but this has a downside of not respecting the stopper signal to shutdown. That could cause the "short-living monitors are not stopped" assertion to fire on the server shutdown in tests. This commit prevents that by using the stopper to start async tasks for the entry refresh (which has additional benefits like recovering from panics).

Fixes: #125329.

**sql: use async task for plan hook goroutine**

This commit is similar in spirit as the previous one and replaces usage of the "vanilla" goroutine with an async task connected to the stopper for the hookFn plan node. This allows us to synchronize the server shutdown and cancellation of the hook task.

Fixes: #125139.

**changefeedccl: create separate root changefeed memory monitor**

This commit introduces a "root" changefeed memory monitor that will be the parent for all CDC DistSQL flows. The primary motivation behind this change is to disconnect the CDC flow from the planner's monitor (which is "txn" monitor of the connection that started the changefeed). This is needed since that connection can be closed _before_ the CDC flow is cleaned up which triggers the "short-living non-stopped monitor" assertion. An additional benefit of this change is improved observability since we can now easily track the memory usage across all changefeeds.

Release note: None

125455: sql: add support for EXPLAIN ANALYZE in SQL shell and a builtin r=yuzefovich a=yuzefovich

This commit fixes a possible crash that could happen whenever the internal executor is used to run EXPLAIN ANALYZE statement (the crash happened because we'd attempt to call `ResetStmtType` on `streamingCommandResult` which is unimplemented). This commit makes this method a no-op since we don't care about the stmt type. This affects the UI SQL shell as well as `crdb_internal.execute_internally` builtin.

It additionally audits all methods of `streamingCommandResult` that panic. `AddBatch` is ok to not be implemented given that we have `SupportsBatch` always return `false`. Another panic is removed in `BufferedResultsLen` (used under read committed) where it's ok to be a no-op. This commit also moves the code around a bit to have better layout.

Epic: None

Release note (sql change): EXPLAIN ANALYZE statements are now supported when executed via UI SQL shell.

125471: rowcontainer: prevent using key encoding for tuples r=yuzefovich a=yuzefovich

This commit fixes a bug that could produce incorrect results or decoding errors when we spilled tuples to disk in the row-by-row engine and used the key encoding for it. The problem is that the tuple key encoding is currently faulty, for example, it could make `encoding.PeekLength` behave incorrectly which later would corrupt the datum for the next column. We now avoid this problematic behavior whenever we're spilling to disk and we need to sort by the tuple type. As a concrete example from the reproduction test:
- we have tuple `(NULL, NULL, NULL)` followed by an interval datum
- when encoding it we get `[0, 0, 0, 22, ...]` since each NULL within the tuple gets the NULL marker
- when decoding the spilled key-value pair, we see that the first encoded value has the NULL marker, so we only skip 1 byte
- this leaves `[0, 0, 22, ...]` to be decoded as an interval resulting in a datum corruption.

The faulty key encoding has existed since the dawn of times (added in df53f5b). The good news is that we cannot store tuples in the tables, so fixing the encoding would only affect the in-memory representation. The bad news is the tuple comparison rules which AFAICT would prohibit us from adding the number of elements in the tuple or the total encoding length in the encoding prefix.

Fixes: #125367.

The bug only affects row-by-row engine when spilling to disk, which is a non-default configuration, so the release note is omitted. (The vectorized engine is unaffected since it doesn't use the row containers and uses values encoding when non-native datums when spilling to disk.)

Release note: None

Co-authored-by: Anne Zhu <anne.zhu@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Jun 12, 2024
5 parents b2ba0bf + 8192a06 + 68a3285 + 0b643f4 + cf339f7 commit f5e65c5
Show file tree
Hide file tree
Showing 34 changed files with 310 additions and 142 deletions.
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,15 @@ func startDistChangefeed(

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// CDC DistSQL flows are long-living, so we want to mark the flow memory
// monitor accordingly.
planCtx.MarkFlowMonitorAsLongLiving = true
// Make sure to use special changefeed monitor going forward as the
// parent monitor for the DistSQL infrastructure. This is needed to
// prevent a race between the connection that started the changefeed
// closing (which closes the current planner's monitor) and changefeed
// DistSQL flow being cleaned up.
planCtx.OverridePlannerMon = execCfg.DistSQLSrv.ChangefeedMonitor
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
// p is the physical plan, recv is the distsqlreceiver
// p is the physical plan, recv is the DistSQLReceiver.
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)
return resultRows.Err()
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ func newChangeAggregatorProcessor(
}()

memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changeagg-mem")
// CDC DistSQL flows are long-living, so we mark the memory monitors
// accordingly.
memMonitor.MarkLongLiving()
ca := &changeAggregator{
flowCtx: flowCtx,
spec: spec,
Expand Down Expand Up @@ -409,10 +406,7 @@ func (ca *changeAggregator) startKVFeed(
opts changefeedbase.StatementOptions,
) (kvevent.Reader, chan struct{}, chan error, error) {
cfg := ca.flowCtx.Cfg
// CDC DistSQL flows are long-living, so we mark the memory monitors
// accordingly.
const longLiving = true
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", memLimit, parentMemMon, longLiving)
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", memLimit, parentMemMon, false /* longLiving */)
kvFeedMemMon.StartNoReserved(ctx, parentMemMon)
buf := kvevent.NewThrottlingBuffer(
kvevent.NewMemBuffer(kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics),
Expand Down Expand Up @@ -1141,9 +1135,6 @@ func newChangeFrontierProcessor(
post *execinfrapb.PostProcessSpec,
) (execinfra.Processor, error) {
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem")
// CDC DistSQL flows are long-living, so we mark the memory monitors
// accordingly.
memMonitor.MarkLongLiving()
sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
if err != nil {
return nil, err
Expand Down
36 changes: 36 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,42 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
return m
}

var (
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.changefeed.max",
Help: "Maximum memory usage across all changefeeds",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
metaMemCurBytes = metric.Metadata{
Name: "sql.mem.changefeed.current",
Help: "Current memory usage across all changefeeds",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
)

// See pkg/sql/mem_metrics.go
// log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
const log10int64times1000 = 19 * 1000

// MakeMemoryMetrics instantiates the metrics holder for memory monitors of
// changefeeds.
func MakeMemoryMetrics(
histogramWindow time.Duration,
) (curCount *metric.Gauge, maxHist metric.IHistogram) {
curCount = metric.NewGauge(metaMemCurBytes)
maxHist = metric.NewHistogram(metric.HistogramOptions{
Metadata: metaMemMaxBytes,
Duration: histogramWindow,
MaxVal: log10int64times1000,
SigFigs: 3,
BucketConfig: metric.MemoryUsage64MBBuckets,
})
return curCount, maxHist
}

func init() {
jobs.MakeChangefeedMetricsHook = MakeMetrics
jobs.MakeChangefeedMemoryMetricsHook = MakeMemoryMetrics
}
9 changes: 7 additions & 2 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ ORDER BY created DESC LIMIT 1`, c.Args.DestTenantName)
// stop eventually. If the provided cutover time is the zero value, cutover to
// the latest replicated time.
func (c *TenantStreamingClusters) Cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time, async bool,
) {
ctx context.Context, producerJobID, ingestionJobID int, cutoverTime time.Time, async bool,
) string {
// Cut over the ingestion job and the job will stop eventually.
var cutoverStr string
if cutoverTime.IsZero() {
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
c.Args.DestTenantName).Scan(&cutoverStr)
cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr)
protectedTimestamp := replicationutils.TestingGetPTSFromReplicationJob(c.T, ctx, c.SrcSysSQL, c.SrcSysServer, producerJobID)
require.LessOrEqual(c.T, protectedTimestamp.GoTime(), cutoverOutput.GoTime())
} else {
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
Expand All @@ -281,6 +284,8 @@ func (c *TenantStreamingClusters) Cutover(
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitForPostCutoverRetentionJob()
}

return cutoverStr
}

// StartStreamReplication producer job ID and ingestion job ID.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/isql",
"//pkg/storage",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -277,3 +281,26 @@ func TestingGetStreamIngestionStatsFromReplicationJob(
require.NoError(t, err)
return stats
}

func TestingGetPTSFromReplicationJob(
t *testing.T,
ctx context.Context,
sqlRunner *sqlutils.SQLRunner,
srv serverutils.ApplicationLayerInterface,
producerJobID int,
) hlc.Timestamp {
payload := jobutils.GetJobPayload(t, sqlRunner, jobspb.JobID(producerJobID))
details := payload.GetStreamReplication()
ptsRecordID := details.ProtectedTimestampRecordID
ptsProvider := srv.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider

var ptsRecord *ptpb.Record
err := srv.InternalDB().(descs.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
ptsRecord, err = ptsProvider.WithTxn(txn).GetRecord(ctx, ptsRecordID)
return err
})
require.NoError(t, err)

return ptsRecord.Timestamp
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ func TestAlterTenantCompleteToLatest(t *testing.T) {
targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID))

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
args.DestTenantName).Scan(&cutoverStr)
var emptyCutoverTime time.Time
cutoverStr := c.Cutover(ctx, producerJobID, ingestionJobID, emptyCutoverTime, false)

cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestDataDriven(t *testing.T) {
}
timestamp, _, err := tree.ParseDTimestamp(nil, cutoverTime, time.Microsecond)
require.NoError(t, err)
ds.replicationClusters.Cutover(ds.producerJobID, ds.ingestionJobID, timestamp.Time, async)
ds.replicationClusters.Cutover(ctx, ds.producerJobID, ds.ingestionJobID, timestamp.Time, async)
return ""

case "exec-sql":
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) {

// Check dest has caught up the previous updates.
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime())

// Ingestion happened one more time after resuming the ingestion job.
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {

cutoverTime := c.DestSysServer.Clock().Now()
c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID))
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Clients should never be started prior to a checkpointed timestamp
Expand Down Expand Up @@ -566,7 +566,7 @@ INSERT INTO d.t_for_import (i) VALUES (1);
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))

cutoverTime := c.SrcSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
}

Expand Down Expand Up @@ -720,7 +720,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))

cutoverTime := c.DestSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
require.GreaterOrEqual(t, counts["physical_replication.cutover"], int32(1))
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestProtectedTimestampManagement(t *testing.T) {
c.DestSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", replicationJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
var emptyCutoverTime time.Time
c.Cutover(producerJobID, replicationJobID, emptyCutoverTime, false)
c.Cutover(ctx, producerJobID, replicationJobID, emptyCutoverTime, false)
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='100ms'`, c.Args.SrcTenantName))
}

Expand Down Expand Up @@ -1404,7 +1404,7 @@ func TestStreamingMismatchedMRDatabase(t *testing.T) {

c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)

defer c.StartDestTenant(ctx, nil, 0)()

Expand Down Expand Up @@ -1479,7 +1479,7 @@ func TestStreamingZoneConfigsMismatchedRegions(t *testing.T) {

c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)

defer c.StartDestTenant(ctx, nil, 0)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type streamIngestionFrontier struct {
// persistedReplicatedTime stores the highwater mark of
// progress that is persisted in the job record.
persistedReplicatedTime hlc.Timestamp
// heartbeatTime is the earliest timestamp for which the
// source cluster can begin garbage collection.
heartbeatTime hlc.Timestamp

lastPartitionUpdate time.Time
lastFrontierDump time.Time
Expand Down Expand Up @@ -204,10 +207,10 @@ func (sf *streamIngestionFrontier) Next() (
case <-sf.Ctx().Done():
sf.MoveToDrainingAndLogError(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the latest persisted replicated time in the heartbeat to the source cluster
// Send the latest heartbeat time in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
// the source cluster is free to clean up earlier data.
case sf.heartbeatSender.FrontierUpdates <- sf.persistedReplicatedTime:
case sf.heartbeatSender.FrontierUpdates <- sf.heartbeatTime:
// If heartbeatSender has error, it means remote has error, we want to
// stop the processor.
case <-sf.heartbeatSender.StoppedChan:
Expand Down Expand Up @@ -315,6 +318,7 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
})

replicatedTime := f.Frontier()
var cutoverTime hlc.Timestamp

sf.lastPartitionUpdate = timeutil.Now()
log.VInfof(ctx, 2, "persisting replicated time of %s", replicatedTime)
Expand All @@ -329,6 +333,8 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest
streamProgress.Checkpoint.ResolvedSpans = frontierResolvedSpans

cutoverTime = streamProgress.CutoverTime

// Keep the recorded replicatedTime empty until some advancement has been made
if sf.replicatedTimeAtStart.Less(replicatedTime) {
streamProgress.ReplicatedTime = replicatedTime
Expand Down Expand Up @@ -368,8 +374,8 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {

// If we have a CutoverTime set, keep the protected
// timestamp at or below the cutover time.
if !streamProgress.CutoverTime.IsEmpty() && streamProgress.CutoverTime.Less(newProtectAbove) {
newProtectAbove = streamProgress.CutoverTime
if !cutoverTime.IsEmpty() && cutoverTime.Less(newProtectAbove) {
newProtectAbove = cutoverTime
}

if record.Timestamp.Less(newProtectAbove) {
Expand All @@ -382,6 +388,12 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
}
sf.metrics.JobProgressUpdates.Inc(1)
sf.persistedReplicatedTime = f.Frontier()

if cutoverTime.IsEmpty() || sf.persistedReplicatedTime.Less(cutoverTime) {
sf.heartbeatTime = sf.persistedReplicatedTime
} else {
sf.heartbeatTime = cutoverTime
}
sf.metrics.ReplicatedTimeSeconds.Update(sf.persistedReplicatedTime.GoTime().Unix())
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestReplicationJobResumptionStartTime(t *testing.T) {
canContinue <- struct{}{}
srcTime = c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID))
c.Cutover(producerJobID, replicationJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, replicationJobID, srcTime.GoTime(), false)
jobutils.WaitForJobToSucceed(t, c.DestSysSQL, jobspb.JobID(replicationJobID))
}

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestCutoverCheckpointing(t *testing.T) {
// Ensure there are no remaining cutover spans before cutover begins.
require.Equal(t, len(getCutoverRemainingSpans()), 0)

c.Cutover(producerJobIDInt, replicationJobIDInt, cutoverTime.GoTime(), true)
c.Cutover(ctx, producerJobIDInt, replicationJobIDInt, cutoverTime.GoTime(), true)
<-progressUpdated

c.DestSysSQL.Exec(t, `PAUSE JOB $1`, &replicationJobID)
Expand Down
4 changes: 4 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) {
// ccl code.
var MakeChangefeedMetricsHook func(time.Duration) metric.Struct

// MakeChangefeedMemoryMetricsHook allows for registration of changefeed memory
// metrics from ccl code.
var MakeChangefeedMemoryMetricsHook func(time.Duration) (curCount *metric.Gauge, maxHist metric.IHistogram)

// MakeStreamIngestMetricsHook allows for registration of streaming metrics from
// ccl code.
var MakeStreamIngestMetricsHook func(duration time.Duration) metric.Struct
Expand Down
Loading

0 comments on commit f5e65c5

Please sign in to comment.