Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to get execution time of each job & run each job with a timeout? #35

Closed
oze4 opened this issue Oct 6, 2020 · 0 comments
Closed

Comments

@oze4
Copy link

oze4 commented Oct 6, 2020

Putting this here for visibility...

Example of how to record "execution duration" + run each job with an "auto cancelling" timeout.

I understand it is not mint code, so please feel free to suggest changes, or supply your own improved version.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gammazero/workerpool"
)

// JobResult holds a jobs result
type JobResult struct {
	// Public
	Data interface{}

	// Private
	err     error
	runtime time.Duration
	name    string
}

// Name returns name. It is written like this so the consumer
// cannot change the name outside of supplying one via the Job
func (jr *JobResult) Name() string {
	return jr.name
}

// Runtime returns job execution runtime
func (jr *JobResult) Runtime() time.Duration {
	return jr.runtime
}

// Error holds job errors, if any
func (jr *JobResult) Error() error {
	return jr.err
}

// SetError sets an error on our result
func (jr *JobResult) SetError(e error) {
	jr.err = e
}

// Job holds job data
type Job struct {
	Name string
	Task func() JobResult
}

func wrapJob(timeout time.Duration, resultsChan chan JobResult, job Job) func() {
	// Create our context with timeout per job
	timeoutContext, timeoutCancel := context.WithTimeout(context.Background(), timeout)

	return func() {
		timerStart := time.Now()
		// Start goroutine using our context, which contains our timeout.
		go func(ctx context.Context, done context.CancelFunc, resChan chan JobResult, todo Job, startTime time.Time) {
			// Get result from job
			result := todo.Task()

			// Set name & execution time after job completion
			result.runtime = time.Since(startTime)
			result.name = todo.Name

			// If the timeout has been hit then `timeoutContext.Err()`
			// will be != nil and we should not send it on our results chan.
			//
			// Without this check we would send this job twice due to the fact
			// we cannot cancel in-flight requests.
			//
			// Lets say we have a long running task, how would we cancel it
			// in-flight? Whether http request or simply running `time.Sleep(time.Hour*999999)`?
			//
			// Instead we just don't do anything with the return, hence this check.
			if timeoutContext.Err() == nil {
				resChan <- result
			}

			// Forcefully cancel our context.
			// Cancelling forcefully is not bad, essentially it means success
			done()
		}(timeoutContext, timeoutCancel, resultsChan, job, timerStart)

		select {
		// If our timeout is hit *or* cancelled forcefully, we wind up here...
		case <-timeoutContext.Done():
			// ...that is why we check for error
			switch timeoutContext.Err() {
			// ...specifically the timeout error.
			case context.DeadlineExceeded:
				// Place a result on our results channel that contains
				// an error, which we can check for later.
				resultsChan <- JobResult{
					err:     context.DeadlineExceeded,
					name:    job.Name,
					runtime: time.Since(timerStart),
				}
			}
		}
	}
}

var jobs = []Job{{
	Name: "job1",
	Task: func() JobResult {
		// THIS JOB WILL ERROR ON PURPOSE
		// This will surpass our timeout and should get cancelled
		// ...you can do whatever you want in these jobs
		time.Sleep(time.Second * 3)
		// Don't have to set the name here
		return JobResult{Data: map[string]string{"Whatever": "You want"}}
	}}, {
	Name: "job2",
	Task: func() JobResult {
		// THIS JOB WILL SUCCEED
		time.Sleep(time.Millisecond * 300)
		resultFromCurl := "i am a result"
		return JobResult{Data: resultFromCurl}
	}},
}

func main() {
	// Set timeout here (or per job)
	jobTimeout := time.Duration(time.Second * 1) // 1 second

	// Create results channel with T type where T is whatever type you need
	jobResultsChannel := make(chan JobResult, len(jobs))

	// Create workerpool
	numWorkers := 10
	pool := workerpool.New(numWorkers)

	// Submit jobs to workerpool using our wrapper func
	for _, job := range jobs {
		pool.Submit(wrapJob(jobTimeout, jobResultsChannel, job))
	}

	// Wait for jobs to finish and close results channel
	pool.StopWait()
	close(jobResultsChannel)

	// Do whatever you want with results
	for jobResult := range jobResultsChannel {
		runTime := int(jobResult.Runtime() / time.Millisecond)
		str := "[%dms] : '%s' : JobSuccess : %s\n"
		data := jobResult.Data

		if jobResult.Error() != nil { // You should always check for errors
			str = "[%dms] : '%s' : JobError : %s\n"
			data = jobResult.Error()
		}

		fmt.Printf(str, runTime, jobResult.Name(), data)
	}
}
//// Output:
// [303ms] 'job2' : JobSuccess : i am a result
// [1001ms] 'job1' : JobError : context deadline exceeded
@oze4 oze4 closed this as completed Oct 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant