Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
5 changes: 3 additions & 2 deletions pkg/backup/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)
Expand Down Expand Up @@ -215,7 +216,7 @@ func restoreWithRetry(
// dying), so if we receive a retryable error, re-plan and retry the restore.
retryOpts, progThreshold := getRetryOptionsAndProgressThreshold(execCtx)
logRate := restoreRetryLogRate.Get(&execCtx.ExecCfg().Settings.SV)
logThrottler := util.Every(logRate)
logThrottler := util.EveryMono(logRate)
var (
res roachpb.RowCount
err error
Expand Down Expand Up @@ -255,7 +256,7 @@ func restoreWithRetry(

log.Dev.Warningf(ctx, "encountered retryable error: %+v", err)

if logThrottler.ShouldProcess(timeutil.Now()) {
if logThrottler.ShouldProcess(crtime.NowMono()) {
// We throttle the logging of errors to the jobs messages table to avoid
// flooding the table during the hot loop of a retry.
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -318,7 +319,7 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily(
Alloc: &c.a,
Spec: &spec,
TraceKV: c.rfArgs.traceKV,
TraceKVEvery: &util.EveryN{N: c.rfArgs.traceKVLogFrequency},
TraceKVEvery: &util.EveryN[crtime.Mono]{N: c.rfArgs.traceKVLogFrequency},
},
); err != nil {
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ const (
)

// slowLogEveryN rate-limits the logging of slow spans
var slowLogEveryN = log.Every(slowSpanMaxFrequency)
var slowLogEveryN = util.Every(slowSpanMaxFrequency)

// jobState encapsulates changefeed job state.
type jobState struct {
Expand Down Expand Up @@ -2432,7 +2432,7 @@ type saveRateConfig struct {
// duration it takes to save progress.
type saveRateLimiter struct {
config saveRateConfig
warnEveryN util.EveryN
warnEveryN util.EveryN[time.Time]

clock timeutil.TimeSource

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/roachtestutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func SetDefaultAdminUIPort(c cluster.Cluster, opts *install.StartOpts) {
// recently a given log message has been emitted so that it can determine
// whether it's worth logging again.
type EveryN struct {
util.EveryN
util.EveryN[time.Time]
}

// Every is a convenience constructor for an EveryN object that allows a log
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -147,7 +148,7 @@ type raftLogQueue struct {
*baseQueue
db *kv.DB

logSnapshots util.EveryN
logSnapshots util.EveryN[crtime.Mono]
}

var _ queueImpl = &raftLogQueue{}
Expand All @@ -162,7 +163,7 @@ var _ queueImpl = &raftLogQueue{}
func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue {
rlq := &raftLogQueue{
db: db,
logSnapshots: util.Every(10 * time.Second),
logSnapshots: util.EveryMono(10 * time.Second),
}
rlq.baseQueue = newBaseQueue(
"raftlog", rlq, store,
Expand Down Expand Up @@ -689,7 +690,7 @@ func (rlq *raftLogQueue) process(
return false, nil
}

if n := decision.NumNewRaftSnapshots(); log.V(1) || n > 0 && rlq.logSnapshots.ShouldProcess(timeutil.Now()) {
if n := decision.NumNewRaftSnapshots(); log.V(1) || n > 0 && rlq.logSnapshots.ShouldProcess(crtime.NowMono()) {
log.KvExec.Infof(ctx, "%v", redact.Safe(decision.String()))
log.KvDistribution.Infof(ctx, "%v", redact.Safe(decision.String()))
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
Expand Down Expand Up @@ -1056,7 +1057,7 @@ type Replica struct {
// information and without explicit throttling some replicas will offer once
// per applied Raft command, which is silly and also clogs up the queues'
// semaphores.
splitQueueThrottle, mergeQueueThrottle util.EveryN
splitQueueThrottle, mergeQueueThrottle util.EveryN[crtime.Mono]

// loadBasedSplitter keeps information about load-based splitting.
loadBasedSplitter split.Decider
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -688,7 +689,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Record the number of keys written to the replica.
b.r.loadStats.RecordWriteKeys(float64(b.ab.numMutations))

now := timeutil.Now()
now := crtime.NowMono()
if needsSplitBySize && r.splitQueueThrottle.ShouldProcess(now) {
r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ func newUninitializedReplicaWithoutRaftGroup(store *Store, id roachpb.FullReplic
store.TestingKnobs().DisableSyncLogWriteToss,
}

r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration)
r.splitQueueThrottle = util.EveryMono(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.EveryMono(mergeQueueThrottleDuration)

onTrip := func() {
telemetry.Inc(telemetryTripAsync)
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigjob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
],
)
9 changes: 5 additions & 4 deletions pkg/spanconfig/spanconfigjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -101,14 +102,14 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro
settingValues := &execCtx.ExecCfg().Settings.SV
persistCheckpointsMu := struct {
syncutil.Mutex
util.EveryN
util.EveryN[crtime.Mono]
}{}
persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues))
persistCheckpointsMu.EveryN = util.EveryMono(ReconciliationJobCheckpointInterval.Get(settingValues))

ReconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) {
persistCheckpointsMu.Lock()
defer persistCheckpointsMu.Unlock()
persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues))
persistCheckpointsMu.EveryN = util.EveryMono(ReconciliationJobCheckpointInterval.Get(settingValues))
})

checkpointingDisabled := false
Expand Down Expand Up @@ -152,7 +153,7 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro
shouldPersistCheckpoint := func() bool {
persistCheckpointsMu.Lock()
defer persistCheckpointsMu.Unlock()
return persistCheckpointsMu.ShouldProcess(timeutil.Now())
return persistCheckpointsMu.ShouldProcess(crtime.NowMono())
}()

if !shouldPersistCheckpoint {
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -151,7 +151,7 @@ func (s *SQLWatcher) watch(
}
defer ptsRF.Close()

checkpointNoops := util.Every(s.checkpointNoopsEvery)
checkpointNoops := util.EveryMono(s.checkpointNoopsEvery)
for {
select {
case <-ctx.Done():
Expand All @@ -166,7 +166,7 @@ func (s *SQLWatcher) watch(
return err
}
if len(sqlUpdates) == 0 &&
(!checkpointNoops.ShouldProcess(timeutil.Now()) || s.knobs.SQLWatcherSkipNoopCheckpoints) {
(!checkpointNoops.ShouldProcess(crtime.NowMono()) || s.knobs.SQLWatcherSkipNoopCheckpoints) {
continue
}
if err := handler(ctx, sqlUpdates, combinedFrontierTS); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/unique",
"//pkg/util/uuid",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -339,7 +340,7 @@ type FetcherInitArgs struct {
TraceKV bool
// TraceKVEvery controls how often KVs are sampled for logging with traceKV
// enabled.
TraceKVEvery *util.EveryN
TraceKVEvery *util.EveryN[crtime.Mono]
ForceProductionKVBatchSize bool
// SpansCanOverlap indicates whether the spans in a given batch can overlap
// with one another. If it is true, spans that correspond to the same row must
Expand Down Expand Up @@ -1196,7 +1197,7 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, spanID
// log.EveryN will always print under verbosity level 2.
// The caller may choose to set it to avoid logging
// too many rows. If unset, we log every KV.
if rf.args.TraceKV && (rf.args.TraceKVEvery == nil || rf.args.TraceKVEvery.ShouldProcess(timeutil.Now())) {
if rf.args.TraceKV && (rf.args.TraceKVEvery == nil || rf.args.TraceKVEvery.ShouldProcess(crtime.NowMono())) {
log.VEventf(ctx, TraceKVVerbosity, "fetched: %s -> %s", prettyKey, prettyVal)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_library(
"//pkg/util/tracing/tracingpb",
"//pkg/util/vector",
"@com_github_axiomhq_hyperloglog//:hyperloglog",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/rowexec/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -245,14 +245,14 @@ func (s *sampleAggregator) mainLoop(
}

var rowsProcessed uint64
progressUpdates := util.Every(SampleAggregatorProgressInterval)
progressUpdates := util.EveryMono(SampleAggregatorProgressInterval)
var da tree.DatumAlloc
for {
row, meta := s.input.Next()
if meta != nil {
if meta.SamplerProgress != nil {
rowsProcessed += meta.SamplerProgress.RowsProcessed
if progressUpdates.ShouldProcess(timeutil.Now()) {
if progressUpdates.ShouldProcess(crtime.NowMono()) {
// Periodically report fraction progressed and check that the job has
// not been paused or canceled.
var fractionCompleted float32
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/unsafesql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/log/severity",
"//pkg/util/timeutil",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_redact//:redact",
],
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/unsafesql/unsafesql.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -58,14 +58,14 @@ func CheckInternalsAccess(
// If an override is set, allow access to this virtual table.
if sd.AllowUnsafeInternals {
// Log this access to the SENSITIVE_ACCESS channel since the override condition bypassed normal access controls.
if accessedLogLimiter.ShouldProcess(timeutil.Now()) {
if accessedLogLimiter.ShouldProcess(crtime.NowMono()) {
log.StructuredEvent(ctx, severity.WARNING, &eventpb.UnsafeInternalsAccessed{Query: q})
}
return nil
}

// Log this access to the SENSITIVE_ACCESS channel to show where failing internals accesses are happening.
if deniedLogLimiter.ShouldProcess(timeutil.Now()) {
if deniedLogLimiter.ShouldProcess(crtime.NowMono()) {
log.StructuredEvent(ctx, severity.WARNING, &eventpb.UnsafeInternalsDenied{Query: q})
}
return sqlerrors.ErrUnsafeTableAccess
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
deps = [
"//pkg/util/netutil/addr",
"//pkg/util/syncutil",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
] + select({
Expand Down Expand Up @@ -56,6 +57,7 @@ go_test(
deps = [
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
Loading
Loading