Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,25 +487,7 @@ func main() {
monitor := reservations.NewMonitor(multiclusterClient)
metrics.Registry.MustRegister(&monitor)
commitmentsConfig := conf.GetConfigOrDie[commitments.Config]()
commitmentsDefaults := commitments.DefaultConfig()
if commitmentsConfig.RequeueIntervalActive == 0 {
commitmentsConfig.RequeueIntervalActive = commitmentsDefaults.RequeueIntervalActive
}
if commitmentsConfig.RequeueIntervalRetry == 0 {
commitmentsConfig.RequeueIntervalRetry = commitmentsDefaults.RequeueIntervalRetry
}
if commitmentsConfig.PipelineDefault == "" {
commitmentsConfig.PipelineDefault = commitmentsDefaults.PipelineDefault
}
if commitmentsConfig.SchedulerURL == "" {
commitmentsConfig.SchedulerURL = commitmentsDefaults.SchedulerURL
}
if commitmentsConfig.ChangeAPIWatchReservationsTimeout == 0 {
commitmentsConfig.ChangeAPIWatchReservationsTimeout = commitmentsDefaults.ChangeAPIWatchReservationsTimeout
}
if commitmentsConfig.ChangeAPIWatchReservationsPollInterval == 0 {
commitmentsConfig.ChangeAPIWatchReservationsPollInterval = commitmentsDefaults.ChangeAPIWatchReservationsPollInterval
}
commitmentsConfig.ApplyDefaults()

if err := (&commitments.CommitmentReservationController{
Client: multiclusterClient,
Expand Down Expand Up @@ -673,10 +655,7 @@ func main() {
must.Succeed(metrics.Registry.Register(syncerMonitor))
syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor)
syncerConfig := conf.GetConfigOrDie[commitments.SyncerConfig]()
syncerDefaults := commitments.DefaultSyncerConfig()
if syncerConfig.SyncInterval == 0 {
syncerConfig.SyncInterval = syncerDefaults.SyncInterval
}
syncerConfig.ApplyDefaults()
if err := (&task.Runner{
Client: multiclusterClient,
Interval: syncerConfig.SyncInterval,
Expand Down
25 changes: 25 additions & 0 deletions internal/scheduling/reservations/commitments/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ type Config struct {
EnableReportCapacityAPI bool `json:"committedResourceEnableReportCapacityAPI"`
}

// ApplyDefaults fills in any unset values with defaults.
func (c *Config) ApplyDefaults() {
defaults := DefaultConfig()
if c.RequeueIntervalActive == 0 {
c.RequeueIntervalActive = defaults.RequeueIntervalActive
}
if c.RequeueIntervalRetry == 0 {
c.RequeueIntervalRetry = defaults.RequeueIntervalRetry
}
if c.PipelineDefault == "" {
c.PipelineDefault = defaults.PipelineDefault
}
if c.SchedulerURL == "" {
c.SchedulerURL = defaults.SchedulerURL
}
if c.ChangeAPIWatchReservationsTimeout == 0 {
c.ChangeAPIWatchReservationsTimeout = defaults.ChangeAPIWatchReservationsTimeout
}
if c.ChangeAPIWatchReservationsPollInterval == 0 {
c.ChangeAPIWatchReservationsPollInterval = defaults.ChangeAPIWatchReservationsPollInterval
}
// Note: EnableChangeCommitmentsAPI, EnableReportUsageAPI, EnableReportCapacityAPI
// are booleans where false is a valid value, so we don't apply defaults for them
}

func DefaultConfig() Config {
return Config{
RequeueIntervalActive: 5 * time.Minute,
Expand Down
14 changes: 13 additions & 1 deletion internal/scheduling/reservations/commitments/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,24 @@ func DefaultSyncerConfig() SyncerConfig {
}
}

// ApplyDefaults fills in any unset values with defaults.
func (c *SyncerConfig) ApplyDefaults() {
defaults := DefaultSyncerConfig()
if c.SyncInterval == 0 {
c.SyncInterval = defaults.SyncInterval
}
// Note: KeystoneSecretRef and SSOSecretRef are not defaulted as they require explicit configuration
}

type Syncer struct {
// Client to fetch commitments from Limes
CommitmentsClient
// Kubernetes client for CRD operations
client.Client
// Monitor for metrics
monitor *SyncerMonitor
// SyncInterval is stored for logging purposes (actual interval managed by task.Runner)
syncInterval time.Duration
}

func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor) *Syncer {
Expand All @@ -54,6 +65,7 @@ func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor) *Syncer {
}

func (s *Syncer) Init(ctx context.Context, config SyncerConfig) error {
s.syncInterval = config.SyncInterval
if err := s.CommitmentsClient.Init(ctx, s.Client, config); err != nil {
return err
}
Expand Down Expand Up @@ -191,7 +203,7 @@ func (s *Syncer) SyncReservations(ctx context.Context) error {
ctx = WithNewGlobalRequestID(ctx)
logger := LoggerFromContext(ctx).WithValues("component", "syncer", "runID", runID)

logger.Info("starting commitment sync with sync interval", "interval", DefaultSyncerConfig().SyncInterval)
logger.Info("starting commitment sync")

// Record sync run
if s.monitor != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/task/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,26 @@ func (r *Runner) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result,
return ctrl.Result{}, nil
}

// MinInterval is the minimum allowed interval for task runners to prevent panics from time.NewTicker(0).
const MinInterval = 1 * time.Millisecond

// Start starts the task runner, which will send events at the specified interval.
func (r *Runner) Start(ctx context.Context) error {
log := log.FromContext(ctx)
log.Info("starting task runner", "name", r.Name, "interval", r.Interval)

ticker := time.NewTicker(r.Interval)
// Safety: ensure interval is at least MinInterval to prevent tight loops or panics
interval := r.Interval
if interval < MinInterval {
log.Info("task runner interval too low, using minimum",
"name", r.Name,
"configuredInterval", r.Interval,
"minInterval", MinInterval)
interval = MinInterval
}

log.Info("starting task runner", "name", r.Name, "interval", interval)

ticker := time.NewTicker(interval)
defer ticker.Stop()
defer close(r.eventCh)

Expand Down
27 changes: 27 additions & 0 deletions pkg/task/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,33 @@ func TestRunner_SetupWithManager_EventChannelCreation(t *testing.T) {
}
}

func TestRunner_Start_ZeroInterval_UsesMinimum(t *testing.T) {
runner := &Runner{
Name: "test-task",
Interval: 0, // Zero interval should use MinInterval (1ms)
eventCh: make(chan event.GenericEvent, 10),
}

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// Run Start in goroutine since it blocks until context is cancelled
go func() {
// This should NOT panic even with 0 interval
if err := runner.Start(ctx); err != nil {
t.Logf("Start returned error: %v (expected on context cancellation)", err)
}
}()

// Wait for initial event with generous timeout to avoid flakiness on slow machines
select {
case <-runner.eventCh:
// Success - got initial event without panic
case <-time.After(1 * time.Second):
t.Error("Expected to receive initial event")
}
}

func TestRunner_EventStructure(t *testing.T) {
runner := &Runner{
Name: "test-task",
Expand Down
Loading