Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get response from all worker pulls #34

Closed
JennyMet opened this issue Oct 4, 2020 · 3 comments
Closed

Get response from all worker pulls #34

JennyMet opened this issue Oct 4, 2020 · 3 comments

Comments

@JennyMet
Copy link

JennyMet commented Oct 4, 2020

Hi,

I want to run multiple requests and get one response (struct or something) that all done.
I mean if I've 10 request and the longest took 10 sec i want after 10 seconds get the all the responses with id or something for each request to know who success and who has failed, is it possible ?

thanks

@oze4
Copy link

oze4 commented Oct 5, 2020

I am not sure if this is the best approach but using closure/wrapping each "job" + custom "response type" solves this..

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/gammazero/workerpool"
)

// Response holds job response
type Response struct {
	ID    int
	Error error
	Data  interface{}
}

// wrapJob sends the result of our job to our results channel
func wrapJob(rc chan Response, f func() Response) func() {
	return func() { rc <- f() }
}

var jobs = []func() Response{
	func() Response {
		// do whatever
		// ...
		time.Sleep(time.Second * 3)
		return Response{ID: 1, Data: map[string]string{"hello": "mars"}}
	},
	func() Response {
		// do whatever
		// ...
		time.Sleep(time.Second)
		resp := Response{ID: 2}
		err := errors.New("some error") // simulate error
		if err != nil {
			resp.Error = err
			return resp
		}
		resp.Data = "some result that you may or may not see. depends if there is an error or not."
		return resp
	},
}

func main() {
	wp := workerpool.New(10)
	results := make(chan Response, len(jobs))

	for _, job := range jobs {
		wp.Submit(wrapJob(results, job))
	}

	wp.StopWait()

	close(results)

	for result := range results {
		if result.Error != nil {
			fmt.Printf("Found job with an error : ID '%d' : Error '%s'\n", result.ID, result.Error.Error())
		} else {
			fmt.Printf("Success : ID '%d' : Data '%+v'\n", result.ID, result.Data)
		}
	}
}

Which returns:

// Found job with an error : ID '2' : Error 'some error'
// Success : ID '1' : Data 'map[hello:mars]'

@gammazero
Copy link
Owner

@JennyMet Yes, that is certainly doable. A simple way to do what you want is to submit a function to workerpool that calls your job function with an id and a channel to put results on.

wp.Submit(func() {
    myFunc(id, resultChan)
})

After the all the jobs are submitted, you can then use select to wait on the result channel or until a timeout. Keep collecting results from the channel until all the results that you expect are collected, or until you timeout waiting.

timeout := time.After(10 * time.Second)
for i := 0; i < jobCount; i++ {
    select {
    case r := <-resultChan:
        handleResult(r)
    case <-timeout:
        return fmt.Errorf("timed out waiting for %d results", jobCount - i)
    } 
}

Here is some code that expands on the idea presented by @oze4 that shows a way to separate the generic mechanics of running and waiting for jobs into a separate package from your code that defines what a job does:
https://gist.github.com/gammazero/bb543631f25a73d1a3f6c8e1a6970eb8

@JennyMet
Copy link
Author

JennyMet commented Oct 7, 2020

@gammazero @oze4 - Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants