-
-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add single mode in job #64
Conversation
Hey @farwydi, thanks for contributing. Let me see if I got it right: is this to prevent "overrunning" a job upon itself? |
Yes sir, similar #63 |
Oh, cool! I believe this could make good use of singleflight, have you considered using it? |
It'd be nice if there were a good way to provide feedback when a job is scheduled to run but is still running. In resource-bound cases it could be a good indicator of resource starvation. I'm not sure of the best way to do this inside of a library though. |
@farwydi did you take a look at a possible singleflight solution? I believe it'd be the way to go here |
e7189b1
to
3e17a67
Compare
I didn't really understand the meaning TestSwap 🐰 |
This mode prevents double start.
@Streppel @dustin-decker I think this option will help us add more subtle settings with different behavior options in the future. How do you look at it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please alter the fragile allocation, otherwise looks good to me.
scheduler_test.go
Outdated
var jobsBefore []Job | ||
for _, p := range jb { | ||
jobsBefore = append(jobsBefore, *p) | ||
jobsBefore := make([]*Job, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting this to 2
instead of len(jb)
is fragile.
j.runCount++ | ||
switch j.runConfig.mode { | ||
case SingletonMode: | ||
_, j.err, _ = j.limiter.Do("main", func() (interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just asking for the sake of it, but is this singleton mode related to the specific job in question? Because the way it is written here it will affect other singleton mode jobs as well, as they will all await for the "main" key. If that's not expected, we could change this key to something job-specific in order to identify each particular job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm sorry for taking so long to respond, I've been super busy nowadays :( will try to check in here more often
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each job has its own limiter singleflight.Group
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
// here
if c, ok := g.m[key]; ok {
Cool @farwydi! I'd just say to update the README file as well |
0b641e9
@@ -20,6 +22,23 @@ func TestTags(t *testing.T) { | |||
assert.ElementsMatch(t, j.Tags(), []string{"tags", "tag", "some"}) | |||
} | |||
|
|||
func TestSingletonMode(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test itself is passing, and passes in a single test run, but when run in aggregate, the next test is failing. Seems the scheduler does in fact send out job that is then waiting on the singleflight to allow it to run, and that goroutine isn't properly stopped / canceled on s.Stop()
- looking into this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed. Adding
s.Stop()
+ time.Sleep(2 * time.Second)
sleep to the end of the test, confirms that a job is run after the scheduler is stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, it appears because the scheduler is scheduling another job, and it's sent to the singleflight.Do() which includes a wait group, even though the scheduler is stopped, the jobs aren't currently told about that stoppage.
I am interested in how best people view to solve this. I found a solution by adding a context with cancel into the job func itself:
func TestSingletonMode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := NewScheduler(time.UTC)
var trigger int32
job, _ := s.Every(1).Second().Do(func() {
select {
case <-ctx.Done():
return
default:
if atomic.LoadInt32(&trigger) == 1 {
t.Fatal("Restart should not occur")
}
atomic.AddInt32(&trigger, 1)
fmt.Println("I am a long task")
time.Sleep(3 * time.Second)
}
})
job.SingletonMode()
s.StartAsync()
time.Sleep(2 * time.Second)
cancel()
s.Stop()
time.Sleep(2 * time.Second)
}
If we go this route - we're essentially saying, you the caller are responsible for making sure your job doesn't run again if you stop the scheduler.
I don't know that there is a way to build it into the library because we can't force jobs to take in a context that we can cancel, or even that the job has to stop when it's cancelled.
This has been merged in #123 |
What does this do?
This mode prevents double start.