Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions pkg/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ type CronStore struct {
type JobHandler func(job *CronJob) (string, error)

type CronService struct {
storePath string
store *CronStore
onJob JobHandler
opts RuntimeOptions
running map[string]struct{}
mu sync.RWMutex
runner *lifecycle.LoopRunner
storePath string
store *CronStore
onJob JobHandler
opts RuntimeOptions
running map[string]struct{}
lastSaveError string
mu sync.RWMutex
runner *lifecycle.LoopRunner
}

func NewCronService(storePath string, onJob JobHandler) *CronService {
Expand Down Expand Up @@ -244,7 +245,11 @@ func (cs *CronService) checkJobs() {

cs.mu.Lock()
defer cs.mu.Unlock()
_ = cs.saveStore()
if err := cs.saveStore(); err != nil {
cs.lastSaveError = err.Error()
} else {
cs.lastSaveError = ""
}
}

func (cs *CronService) executeJobByID(jobID string) bool {
Expand Down Expand Up @@ -503,7 +508,11 @@ func (cs *CronService) RemoveJob(jobID string) bool {
func (cs *CronService) removeJobUnsafe(jobID string) bool {
removed := cs.removeJobByIDUnsafe(jobID)
if removed {
_ = cs.saveStore()
if err := cs.saveStore(); err != nil {
cs.lastSaveError = err.Error()
} else {
cs.lastSaveError = ""
}
}
return removed
}
Expand Down Expand Up @@ -537,7 +546,7 @@ func (cs *CronService) nextSleepDuration(now time.Time) time.Duration {
return cs.opts.RunLoopMaxSleep
}

sleep := time.Until(time.UnixMilli(*nextWake))
sleep := time.UnixMilli(*nextWake).Sub(now)
if sleep < cs.opts.RunLoopMinSleep {
return cs.opts.RunLoopMinSleep
}
Expand Down Expand Up @@ -630,7 +639,11 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob {
job.State.NextRunAtMS = nil
}

cs.saveStore()
if err := cs.saveStore(); err != nil {
cs.lastSaveError = err.Error()
} else {
cs.lastSaveError = ""
}
return job
}
}
Expand Down Expand Up @@ -688,6 +701,7 @@ func (cs *CronService) Status() map[string]interface{} {
"totalFailures": totalFailures,
"latestDelayMs": latestDelayMS,
"latestDurationMs": latestDurationMS,
"lastSaveError": cs.lastSaveError,
}
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,31 @@ func TestNextSleepDuration(t *testing.T) {
}
}

func TestNextSleepDuration_UsesProvidedNow(t *testing.T) {
cs := &CronService{
opts: RuntimeOptions{
RunLoopMinSleep: 1 * time.Second,
RunLoopMaxSleep: 30 * time.Second,
},
running: map[string]struct{}{},
store: &CronStore{Jobs: []CronJob{}},
}

now := time.UnixMilli(10_000)
next := int64(15_000)
cs.store.Jobs = []CronJob{{
ID: "1",
Enabled: true,
State: CronJobState{
NextRunAtMS: &next,
},
}}

if got := cs.nextSleepDuration(now); got != 5*time.Second {
t.Fatalf("expected 5s sleep from provided now, got %s", got)
}
}

func TestCheckJobs_NoConcurrentRunForSameJob(t *testing.T) {
var running int32
var maxRunning int32
Expand Down