Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop job while running #40

Closed
oze4 opened this issue Oct 20, 2020 · 20 comments
Closed

Stop job while running #40

oze4 opened this issue Oct 20, 2020 · 20 comments

Comments

@oze4
Copy link

oze4 commented Oct 20, 2020

Is there a way to stop jobs in mid-execution?

I switched workerpoolxt to use context but the job still runs even though the context has been cancelled... I'm not really sure how to fix this, or if it is even possible.

I have created a POC that reproduces this issue (code below) which you can also view/run on The Go Playground

Any help would be greatly appreciated!!

  • Current output
0 from job a
1 from job a
Job 'a' should have stopped here
2 from job a
3 from job a
4 from job a
[{a context deadline exceeded <nil>} {b <nil> from b}]
  • Expected output
0 from job a
1 from job a
Job 'a' should have stopped here
[{a context deadline exceeded <nil>} {b <nil> from b}]

POC Code:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gammazero/workerpool"
)

func main() {
	runner := newRunner(context.Background(), 10)

	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
	defer cancel()

	runner.do(job{
		Name:    "a",
		Context: ctx,
		Task: func() jobResult {
			for i := 0; i < 10000; i++ {
				time.Sleep(time.Second * 1)
				fmt.Println(i, "from job a")
			}
			return jobResult{Data: "from a"}
		},
	})

	runner.do(job{
		Name: "b",
		Task: func() jobResult {
			time.Sleep(time.Duration(time.Second * 6))
			return jobResult{Data: "from b"}
		},
	})

	results := runner.getjobResults()
	fmt.Println(results)
}

type runner struct {
	*workerpool.WorkerPool
	defaultCtx context.Context
	kill       chan struct{}
	result     chan jobResult
	results    []jobResult
}

func (r *runner) processResults() {
	for {
		select {
		case res, ok := <-r.result:
			if !ok {
				goto Done
			}
			r.results = append(r.results, res)
		}
	}
Done:
	<-r.kill
}

func newRunner(ctx context.Context, numRunners int) *runner {
	r := &runner{
		WorkerPool: workerpool.New(numRunners),
		kill:       make(chan struct{}),
		result:     make(chan jobResult),
		defaultCtx: ctx,
	}
	go r.processResults()
	return r
}

func (r *runner) do(j job) {
	r.Submit(r.wrap(&j))
}

func (r *runner) getjobResults() []jobResult {
	r.StopWait()
	close(r.result)
	r.kill <- struct{}{}
	return r.results
}

func (r *runner) wrap(job *job) func() {
	return func() {
		if job.Context == nil {
			job.Context = r.defaultCtx
		}
		job.childContext, job.done = context.WithCancel(job.Context)
		job.result = make(chan jobResult)
		go job.Run()
		r.result <- job.getResult()
	}
}

type job struct {
	Name         string
	Task         func() jobResult
	Context      context.Context
	result       chan jobResult
	childContext context.Context
	stopped      chan struct{}
	done         context.CancelFunc
}

func (j *job) Run() {
	result := j.Task()
	result.name = j.Name
	j.result <- result
	j.done()
}

func (j *job) getResult() jobResult {
	select {
	case r := <-j.result:
		return r
	case <-j.childContext.Done():
		fmt.Printf("Job '%s' should have stopped here\n", j.Name)
		switch j.childContext.Err() {
		default:
			return jobResult{name: j.Name, Error: j.childContext.Err()}
		}
	}
}

type jobResult struct {
	name  string
	Error error
	Data  interface{}
}
@oze4
Copy link
Author

oze4 commented Oct 21, 2020

See here for an excellent response!

@oze4 oze4 closed this as completed Oct 21, 2020
@gammazero
Copy link
Owner

Also, here is the fixed version on go playground

@oze4
Copy link
Author

oze4 commented Oct 21, 2020

@gammazero I think this is great! Your code def makes way more sense (get rid of my crappy defaults, and your use of context is cleaner), but the issue is still happening..

From what I make of it, it just boils down to the fact that we can't reach into a goroutine and stop it from running (from the outside). We have to provide code inside the goroutine with a mechanism to exit (in the Job.Task func).

Either that or the foundation of my design is flawed.

Here is the same playground, but with the error I have been facing..but with job "b" using a different context than job "a".

Let's say I wanted job "b" to keep going and not cancel due to a separate, unaffiliated, job.

@oze4
Copy link
Author

oze4 commented Oct 21, 2020

Just one of many like it.

It's possible I am way off here, but I take that to mean: once I give workerpool a func() (the Job.Task, essentially), it will run said func() until one of two things happen..

  1. the "wp" or "main" thread I have created a new workerpool object on is ended
  2. the func() eventually runs until completion

So, the context is canceled, but how on earth would workerpool know to kill that goroutine?

It makes sense that something like this is impossible (without providing code inside Job.Task a way to stop itself)... how could we just reach over to X goroutine and kill it from Y goroutine?

I could be wrong, but that's what I've made of this after hours and hours on Google lol

@BredSt
Copy link

BredSt commented Oct 21, 2020

@gammazero - Could you please advice?

@gammazero
Copy link
Owner

Once a goroutine is started, there no way to kill it unless there is something inside that goroutine that is looking for a signal, waiting on a context or channel, etc. In the case of workerpool, and workerpoolxt, the pool is providing the goroutine and calling someone else's function in that goroutine. There is no telling if that function will ever return and allow your goroutine to finish or run another task. The only thing that workerpoolxt can do is to run the task function in another goroutine and wait for the task to finish or for a timer to expire or some other signal to give up waiting (such as context being canceled). When this signal to give up happens, the worker goroutine can return and report an error, but the task goroutine is still running somewhere in the background and may or may not ever finish. All you have done is given up waiting for it to do so.

The danger of letting this happen is that you could build up a number of abandoned goroutines that are still running, which defeats the purpose of workerpool (to limit the amount of concurrency). A way to deal with this is to wait for the task function to finish, and when the signal to give up waiting happens, return an error on a result channel and then go back to waiting (possibly forever) on the task function. This way a pool goroutine is still in use and not available to run more tasks until the task actually completes.

Another idea is to expose the result channel directly to the caller. This way they can wait for a result to show up for a much or as little as they want, and can wait in a select along with many other channels that. The can give up waiting whenever they want, and even go back and check the result channel later if they want. The caller can certainly just put their own result channel into their task function, but the added value of workerpoolxt is that when the result appears, they can also get the other information like task execution time from the result.

@oze4
Copy link
Author

oze4 commented Oct 21, 2020

@gammazero thank you! So I was on the right track and I did understand things correctly.

I think forcing all jobs to cancel if one times out is the easiest, most straight forward way to handle it (like your playground showed).

The only qualm I have is, if every job will be using the same context, as the fixed playground showed, wouldn't it make more sense to provide the context within the constructor?

@oze4
Copy link
Author

oze4 commented Oct 21, 2020

@gammazero this is what I mean by just providing the context via the constructor.

The end result is identical to passing a ctx along with each Job. On second thought... I am way off here... because passing the context along with each job will at least give each job a chance at running for X time (in the case of using timeouts). Otherwise it's like saying "sucks to be job 200 because if it doesn't start within X time it won't run at all".

Please correct me if I'm wrong, but providing context at the "pool" level versus at the "job" level is not an apples:apples comparison (like I thought). Your idea of providing context with each job, albeit the same context, is actually structurally sound.

It's the difference between:

@oze4
Copy link
Author

oze4 commented Oct 21, 2020

Or does it make more sense to do something like you did with pacer?

Like you said, treat jobs as their own thing? So a job just runs on a workerpool (job.run(wp))? I have been attempting to separate Job from WorkerPoolXT over the past week or so, and I'm beginning to think WorkerPoolXT is the limiting factor here.

In getting rid of WorkerPoolXT, there wouldn't be as much abstraction for consumers but in this case that isn't necessarily a bad thing. Less abstraction is almost needed.

Thoughts? What would you do?

@gammazero
Copy link
Owner

gammazero commented Oct 22, 2020

@oze4 Yes, like pacer is what I am think would work best. Pacer is concerned with spacing job execution in time. Wokerpool is concerned with limiting concurrency. These two things do not depend on each other, but can be used together.

pacedTask := pacer.Pace(func() {
    fmt.Println("Hello World")
})

// I can run a paced task using a workerpool
wp := workerpool.New(5)
wp.Submit(pacedTask)

// Or, run it using a goroutine
go pacedTask()

I think it would be nice to do something similar with a monitored task. Also, it would be nice if the caller could choose to have the jobs send back responses on separate channels, or on the same channel.

wp3 := workerpool.New(3)
resultsChan := make(chan Result)

// Create a small job
smallJob := NewJob("smallfry", resultsChan, func() (interface{}, error) {
    fmt.Println("Hello World")
    return nil, nil
})
// Run as many small jobs as we want concurrently
go smallJob.Run()

// Create a big job that will timeout after an hour, or can be canceled.
ctx, cancel := context.WithoutTimeout(context.Background(), 60 * time.Minute)
defer cancel()
bigJob  := NewJob("bigfish", resultsChan, func() (interface{}, error) {
    mp, err := findUnknownMersennePrime(ctx)
    if err != nil {
        return nil, err
    }
    fmt.Println("found one:", mp)
    return mp, nil
})
// Never run more than 3 big jobs at once
wp3.Submit(bigJob.Run)

And then you can wait for the all the jobs the same way, no matter how they were run.

timout := time.After(time.Minute)
for i := 0; i < 2; i++ {
    select {
    case result := <-resultsChan:
        if result.Err() != nil {
            return fmt.Errorf("job %q failed: %v", result.Name(), result.Err())
        }
        fmt.Println("job finished:", result.Name())
        return nil
    case <-timeout:
        fmt.Println("tired of waiting - going to do some other stuff, will check back later")
        break
    }
}
...

The caller could also have chosen to use different channels for the different jobs.

This is only my opinion on one way to implement a flexible API, and is certainly not the only way to do this. It really depends on what kind of problem you are trying to solve. Here is a another approach that emulates JS Promises to give more ideas: https://github.com/chebyrash/promise

@oze4
Copy link
Author

oze4 commented Oct 22, 2020

@gammazero this is so clean!...and makes much more sense. Everything about it just feels way better.

You've prob been like "wtf is this guy doing?" haha

I've learned a ton from you/this package. Thank you.

@oze4
Copy link
Author

oze4 commented Oct 29, 2020

@gammazero I wanted to get your thoughts on something, whenever you have some free time.

In the fixed Playground you provided, I discovered that it is also broken. This is the same playground (with extra logging), and this is the same playground (no extra logging, but the point is it shows how all you have to do is sleep at the end and it breaks it).

I also found this gist from #34 which also has the same issue, as this playground shows.

I have been troubleshooting this, but I can't seem to find a way to stop a func once submitted.

Is it even possible to even stop a func from running after being submitted?

@oze4
Copy link
Author

oze4 commented Oct 30, 2020

Should workerpool accept a func with a ctx param? I have no idea if that is correct, but from what I've been learning that may be one resolution?

@gammazero
Copy link
Owner

gammazero commented Oct 30, 2020 via email

@oze4
Copy link
Author

oze4 commented Oct 30, 2020

@gammazero Understood.

I guess what I meant was, can workerpool expose a way for me to stop it? Similar to how promise uses reject/resolve?

Like a SubmitWithContext(func(ctx context.Context)) method? And internally listen for ctx.Done in order to exit early if needed?

@oze4
Copy link
Author

oze4 commented Oct 30, 2020

Something like this, is what I had in mind (from the perspective of the caller):

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
    defer cancel()

    // Calling done will kill the goroutine
    wp.SubmitDone(func(done func()) {
        go func(c context.Context, done func()) {
            select {
            case <-c.Done():
                switch c.Err() {
                default:
                    done() // Calling done will kill the goroutine
                }
            }
        }(ctx, done)
        
        // do whatever

        done() // Calling done will kill the goroutine
    })

I apologize if this is a stupid question/idea.

Edit: I don't want to continue to do dumb stuff, so if this is dumb, you can tell me it's dumb. Otherwise I'll keep being dumb and it'll be all your fault 😉

Edit 2: I got this idea from playing around with Promise.. This Playground (code below) is what I was doing (not sure if this is wrong as well)...

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"github.com/chebyrash/promise"
)

type someStruct struct {
	Foo string
}

func getresults(c context.Context, reject func(error)) {
	select {
	case <-c.Done():
		switch c.Err() {
		default:
			reject(c.Err())
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
	defer cancel()

	jobA := promise.New(func(resolve func(interface{}), reject func(error)) {
		go getresults(ctx, reject)

		time.Sleep(time.Second)

		if true {
			someval := someStruct{Foo: "bar"}
			resolve(someval)
		} else {
			reject(errors.New("false"))
		}

	}).Then(func(somevalGetsPassedHere interface{}) interface{} {
		// Do something with the value before returning if you want
		//
		// Type assertion
		finalsomeval := somevalGetsPassedHere.(someStruct)
		fmt.Println("Shows how we can access/manipulate fields BEFORE returning : ", finalsomeval.Foo)
		return finalsomeval
	}).Catch(func(err error) error {
		// Do something with the error before returning if you want
		fmt.Println("-There was an error-")
		return err
	})

	// Blocks
	r, e := jobA.Await()
	if e != nil {
		fmt.Println("Error : ", e.Error())
		os.Exit(1)
	}

	result, ok := r.(someStruct)
	if !ok {
		fmt.Println("type assertion failed")
		os.Exit(1)
	}

	fmt.Println(result)
	os.Exit(0)
}

@gammazero
Copy link
Owner

gammazero commented Oct 31, 2020 via email

@oze4
Copy link
Author

oze4 commented Oct 31, 2020

I promise I'm not trying to be a pain in the butt lol... I can't seem to wrap my head around one thing, though..

Since the goroutine which my func() will run in is created by workerpool, is it not possible for workerpool to then provide an "escape hatch" for the caller to end that goroutine?

In my head, from the callers perspective, it would look like:

(workerpool provides a done param much like Promise does with resolve - the only difference is we don't care about capturing return, but capturing a response could easily be abstracted)

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
    defer cancel()

    // Calling done will kill the goroutine
    wp := workerpool.New(10)
    wp.SubmitDone(func(done func()) {
        go func(c context.Context, done func()) {
            select {
            case <-c.Done():
                switch c.Err() {
                default:
                    done() // Calling done will kill the goroutine
                }
            }
        }(ctx, done)
        
        // do whatever

        done() // Calling done will kill the goroutine
    })

I plan on continuing to test code/etc until I understand, though. Not sure if this makes it worse, but I've been testing different ideas and I'm not sure if my implementation is wrong or if it's just not possible (hence these questions lol).

Have a good weekend - don't feel obligated to respond! Thanks for everything....again..

@gammazero
Copy link
Owner

gammazero commented Oct 31, 2020 via email

@oze4
Copy link
Author

oze4 commented Oct 31, 2020

Disregard my (now deleted) comment. I need to do more testing/learning for myself before bugging you. I apologize. Have a good weekend!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants