Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't test processor function which enqueues another job #51

Open
bploetz opened this issue Jan 4, 2021 · 4 comments
Open

Can't test processor function which enqueues another job #51

bploetz opened this issue Jan 4, 2021 · 4 comments

Comments

@bploetz
Copy link

bploetz commented Jan 4, 2021

I have a job processing function that needs to enqueue another job when it's done with it's work. As noted in the FAQ I'm using the helper.With() function to get a handle to a faktory.Client from the pool to push the new job. For example:

func (p *MyProcessor) Process(ctx context.Context, args ...interface{}) error {
    helper := worker.HelperFor(ctx)

    // .... do my work ....

    // enqueue a job for post-processing
    ppjerr := helper.With(func(cl *faktory.Client) error {
        jobArgument := make(map[string]interface{})
        jobArgument["id"] = theID
        jobArgs := []interface{}{jobArgument}
        postProcessingJob := faktory.NewJob("post-processing", jobArgs...)
        return cl.Push(postProcessingJob)
    })

    if ppjerr != nil {
        fmt.Printf("post processing job error: %+v\n", ppjerr)
        return ppjerr
    }
}

In the unit tests for MyProcessor, I'm using the worker.NewTestExecutor(pool) function
to send test jobs to MyProcessor. For example (test cruft removed):

// test setup
pool, _ := faktory.NewPool(2)
var executor worker.PerformExecutor =  worker.NewTestExecutor(pool)
var jobArgument map[string]interface{} = make(map[string]interface{})
jobArgument["id"] = "1234567890"
jobArgs := []interface{}{jobArgument}
var job *faktory.Job = faktory.NewJob("MyProcessor", jobArgs...)

// run processor
err := executor.Execute(job, myProcessor.Process)

// test assertions....

The enqueueing of the second job during the test is returning an error because it appears as though it's attempting to connect to Faktory to push the job, despite the fact I'm using the test executor.

post processing job job error: dial tcp [::1]:7419: connect: connection refused

How exactly do I unit test processor functions which enqueue another job without having it actually try to connect to Faktory? The structure of the code makes it awkward (impossible?) to stub/mock the faktory.Client needed to push the job.

@mperham
Copy link
Contributor

mperham commented Jan 4, 2021

Do you know of any good examples of Go network libraries/APIs which have built in support for stubbing/mocking/handling this type of usecase? It would really help inform me about what's possible here.

So I'm trying to think through the various possibilities:

  1. Faktory is a network service. Stubbing out calls to it is kind of like having some app code which uses the Redis API and stubbing that out. It doesn't make a lot of sense to "unit test" code which is inherently network-reliant.
  2. Providing a fake pool of faktory.Clients which can collect operations (rather than send them to the network) and assert what happened seems like it could be quite powerful, e.g.:
counts, err := executor.ExecuteWithCounts(job, myProcessor.Process)
assert.EqualValues(t, 1, len(counts.PushedJobs()))

@bploetz
Copy link
Author

bploetz commented Jan 4, 2021

There may be more elegant ways to do it, but your 2) possibility is exactly what we do to stub out interacting with Kafka in our unit tests. We use github.com/segmentio/kafka-go to produce events to Kafka.

First we define a generic message writing interface like so:

package events

import (
	"context"
	"os"

	kafka "github.com/segmentio/kafka-go"
)

// MessageWriter interface to allow us to mock the kafka integration in unit tests
type MessageWriter interface {
	WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

Our Faktory job processing structs, which in addition to processing jobs from Faktory, also produce events to Kafka. They have fields which hold these events.MessageWriter like so:

package processors

type ExampleProcessor struct {
	ExampleMessageProducer events.MessageWriter
}

func (p *ExampleProcessor) Process(ctx context.Context, args ...interface{}) error {
	// ....do some work to process the job from Faktory....
	
	// produce an event to Kafka
	kerr := p.ExampleMessageProducer.WriteMessages(context.Background(), kafka.Message{Key: []byte(someID), Value: []byte(somePayload)})
	if kerr != nil {
		return fmt.Errorf("error producing message to kafka: %+v", kerr)
	}
	
	return nil
}

In our unit tests, we declare up a mock producer which just collects the messages sent to it (and has a flag to simulate errors).

type MockProducer struct {
	Messages    []kafka.Message
	returnError bool
}

func (p *MockProducer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
	p.Messages = append(p.Messages, msgs...)
	if p.returnError {
		return fmt.Errorf("an error occurred")
	} else {
		return nil
	}
}

And then set these mock producers on our Faktory job processing structs:

var mockExampleMessageProducer MockProducer = MockProducer{}
var ep processors.ExampleProcessor = processors.ExampleProcessor{ExampleMessageProducer: mockExampleMessageProducer}
pool, _ := faktory.NewPool(2)
var executor worker.PerformExecutor = worker.NewTestExecutor(pool)

In each test example we can assert that a message was produced to Kafka like so:

// test error handling
mockExampleMessageProducer.returnError = true
err := executor.Execute(job, ep.Process)
Expect(err).Should(HaveOccurred())
Expect(err).To(MatchError("error producing message to kafka: an error occurred"))

// test happy path
err := executor.Execute(job, ep.Process)
Expect(len(mockExampleMessageProducer.Messages)).To(Equal(1))
Expect(string(mockExampleMessageProducer.Messages[0].Value)).To(MatchRegexp(`...whatever....`))
Expect(string(mockExampleMessageProducer.Messages[0].Key)).NotTo(BeNil())

Compare that to the real setup where we use actual kafka message writers which match the same MessageWriter interface:

mgr := worker.NewManager()
var exampleMessageProducer = kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{os.Getenv("KAFKA_BROKER_LIST")},
	Topic:   "some.topic",
})
ep = processors.ExampleProcessor{ExampleMessageProducer: exampleMessageProducer}
mgr.Register("example", ep.Process)

I copied all of that from real code and changed the names to protect the innocent, so if something doesn't match up sorry about that, but hopefully you get the idea.

@mperham
Copy link
Contributor

mperham commented Jan 5, 2021

The quickest solution is refactoring and abstracting. With this route, I think you can solve your problem.

func (p *MyProcessor) Process(ctx context.Context, args ...interface{}) error {

    // enqueue a job for post-processing

    // this code doesn't need to be in the With block
    jobArgument := make(map[string]interface{})
    jobArgument["id"] = theID
    jobArgs := []interface{}{jobArgument}
    postProcessingJob := faktory.NewJob("post-processing", jobArgs...)

    // create a new wrapper abstraction which understands if we are in test mode
    // and provides a Push(Job) method.
    myWrapper := testWrapper(worker.HelperFor(ctx))
    ppjerr := myWrapper.Push(postProcessingJob)

You fill in the testWrapper(Helper) func. With this, I think you can abstract away the actual network call.

I'm still pondering how to add test mode. Right now I'm thinking it would be a boolean on TestExecutor which would capture Helper interface calls rather than call the underlying Client. It wouldn't abstract With as that makes no sense but it would add a new Push API and abstract that.

@mperham
Copy link
Contributor

mperham commented Jan 7, 2021

See #52 for initial prototype.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants