Skip to content

Commit

Permalink
Ability to register for events associated with jobs starting immediat…
Browse files Browse the repository at this point in the history
…ely (#632)

* Ability to register for events associated with jobs starting immediately

This commit add some functions to support register events for job which
is on creating and run immediately.

Because of there is RegisterEventListeners in Scheduler receiver
interface (which will register event for all exsiting jobs), I don't
change the function.

* add TestScheduler Register Event Tests

* lint and fix typo

---------

Co-authored-by: Your Name <you@example.com>
  • Loading branch information
drwpls and Your Name committed Dec 12, 2023
1 parent f5461a9 commit 30921b7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
40 changes: 40 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,46 @@ func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) {
}
}

// BeforeJobRuns registers an event listener that is called before a job runs.
func (s *Scheduler) BeforeJobRuns(eventListenerFunc func(jobName string)) *Scheduler {
job := s.getCurrentJob()
job.mu.Lock()
defer job.mu.Unlock()
job.eventListeners.beforeJobRuns = eventListenerFunc

return s
}

// AfterJobRuns registers an event listener that is called after a job runs.
func (s *Scheduler) AfterJobRuns(eventListenerFunc func(jobName string)) *Scheduler {
job := s.getCurrentJob()
job.mu.Lock()
defer job.mu.Unlock()
job.eventListeners.afterJobRuns = eventListenerFunc

return s
}

// WhenJobStarts registers an event listener that is called when a job starts.
func (s *Scheduler) WhenJobReturnsError(eventListenerFunc func(jobName string, err error)) *Scheduler {
job := s.getCurrentJob()
job.mu.Lock()
defer job.mu.Unlock()
job.eventListeners.onError = eventListenerFunc

return s
}

// WhenJobStarts registers an event listener that is called when a job starts.
func (s *Scheduler) WhenJobReturnsNoError(eventListenerFunc func(jobName string)) *Scheduler {
job := s.getCurrentJob()
job.mu.Lock()
defer job.mu.Unlock()
job.eventListeners.noError = eventListenerFunc

return s
}

func (s *Scheduler) PauseJobExecution(shouldPause bool) {
s.executor.skipExecution.Store(shouldPause)
}
57 changes: 57 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2659,6 +2659,63 @@ func TestScheduler_ChainOrder(t *testing.T) {
require.Len(t, s.jobs, 2)
}

func TestScheduler_Register_Event(t *testing.T) {
userDefinedError := errors.New("user defined error")
testCases := []struct {
description string
jobFunc func() error
expected []uint8
expectedError error
}{
{"event order: no error", func() error { return nil }, []uint8{1, 2, 3, 4}, nil},
{"event order: on error", func() error { return userDefinedError }, []uint8{1, 2, 3, 4}, userDefinedError},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
s := NewScheduler(time.UTC)
order := make(chan uint8, 10)
done := make(chan struct{})
var expectedErr error
s = s.BeforeJobRuns(func(jobName string) {
order <- 1
})
s = s.WhenJobReturnsError(func(jobName string, err error) {
order <- 3
expectedErr = err
})
s = s.WhenJobReturnsNoError(func(jobName string) {
order <- 3
})
s = s.AfterJobRuns(func(jobName string) {
order <- 4
done <- struct{}{}
})

_, err := s.Day().Every(1).StartImmediately().Do(func() error {
order <- 2
return tc.jobFunc()
})

require.NoError(t, err)
s.StartAsync()
select {
case <-done:
assert.Equal(t, tc.expectedError, expectedErr)
assert.Equal(t, len(tc.expected), len(order))
if len(tc.expected) == len(order) {
for i := 0; i < len(tc.expected); i++ {
assert.Equal(t, tc.expected[i], <-order)
}
}
s.Clear()
case <-time.After(1 * time.Second):
t.Fatal("timeout")
}
})
}
}

var _ Locker = (*locker)(nil)

type locker struct {
Expand Down

0 comments on commit 30921b7

Please sign in to comment.