Skip to content

henrod/task-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Sidekiq compatible background workers in golang.

  • handles retries
  • responds to Unix signals to safely wait for jobs to finish before exiting
  • well tested

The application using this library will usually be divided into two independent parts: one set of producers (usually an API where the operations will be requested) and one set of consumers (usually background workers).

The consumer app will be responsible for connecting to Redis and waiting for operations to be queued by the producer. This can be done through the following source code example:

package main

import (
	"context"
	// "fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
	
	"github.com/Henrod/task-queue/taskqueue"
	"github.com/google/uuid"
	"github.com/sirupsen/logrus"
)

func myJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
	// err := doSomethingWithYourMessage
	// if err != nil {
	// 	return fmt.Errorf("error while running my job func: %w", err)
	// }
	return nil // if no error happened during the job execution
}

func myOtherJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
	// err := doSomethingWithYourMessage
	// if err != nil {
	// 	return fmt.Errorf("error while running my job func: %w", err)
	// }
	return nil // if no error happened during the job execution
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go handleStop(cancel)

	firstQueueKey := "dummy-queue"
	firstWorkerID := "my-worker-ID"
	firstQueueOptions := &taskqueue.Options{
		QueueKey:         firstQueueKey,
		Namespace:        "my-namespace",
		StorageAddress:   "localhost:6379",
		WorkerID:         firstWorkerID,
		MaxRetries:       3, // message will be discarded after 3 retries
		OperationTimeout: 2 * time.Minute,
	}
	firstTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(firstQueueOptions), firstQueueOptions)
	if err != nil {
		panic(err)
	}
	firstLogger := logrus.New().WithFields(logrus.Fields{
		"operation": "consumer",
		"queueKey": firstQueueKey,
		"workerID": firstWorkerID,
	})
	firstLogger.Info("consuming tasks from first queue")
	firstTaskQueue.Consume(
		ctx,
		func(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
			firstLogger.Printf("consumed task %s from first queue with payload: %v\n", taskID, payload)
			return myJobFunc(ctx, taskID, payload)
		},
	)

	secondQueueKey := "other-dummy-queue"
	secondWorkerID := "my-other-worker-ID"
	secondQueueOptions := &taskqueue.Options{
		QueueKey:         secondQueueKey,
		Namespace:        "my-namespace",
		StorageAddress:   "localhost:6379",
		WorkerID:         secondWorkerID,
		MaxRetries:       -1, // unlimited max retries
		OperationTimeout: 1 * time.Minute,
	}
	secondTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(secondQueueOptions), secondQueueOptions)
	if err != nil {
		panic(err)
	}
	secondLogger := logrus.New().WithFields(logrus.Fields{
		"operation": "consumer",
		"queueKey": secondQueueKey,
		"worker": secondWorkerID,
	})
	secondLogger.Info("consuming tasks from second queue")
	secondTaskQueue.Consume(
		ctx,
		func(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
			secondLogger.Printf("consumed task %s from second queue with payload: %v\n", taskID, payload)
			return myJobFunc(ctx, taskID, payload)
		},
	)
}

func handleStop(cancel context.CancelFunc) {
	logger := logrus.New()
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	<-sigs
	logger.Info("received termination signal, waiting for operations to finish")
	cancel()
}

The producer app will be responsible for connecting to Redis and enqueueing operations to be processed by the consumer. This can be done through the following source code example:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
	
	"github.com/Henrod/task-queue/taskqueue"
	"github.com/google/uuid"
	"github.com/sirupsen/logrus"
)

type Payload struct {
	SomeKey string
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go handleStop(cancel)

	firstQueueKey := "dummy-queue"
	firstQueueOptions := &taskqueue.Options{
		QueueKey:         firstQueueKey,
		Namespace:        "my-namespace",
		StorageAddress:   "localhost:6379",
	}
	firstTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(firstQueueOptions), firstQueueOptions)
	if err != nil {
		panic(err)
	}
	firstLogger := logrus.New().WithFields(logrus.Fields{
		"operation": "consumer",
		"queueKey": firstQueueKey,
	})
	firstLogger.Info("producing task in first queue")
	firstQueueTaskID, err := firstTaskQueue.ProduceAt(ctx, &Payload{
		SomeKey: "some-value",
	}, time.Now())
	if err != nil {
		firstLogger.WithError(err).Error("failed to enqueue task to first queue")
	}
	firstLogger.Infof("enqueued task %s in first queue", firstQueueTaskID)

	secondQueueKey := "other-dummy-queue"
	secondQueueOptions := &taskqueue.Options{
		QueueKey:         secondQueueKey,
		Namespace:        "my-namespace",
		StorageAddress:   "localhost:6379",
	}
	secondTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(secondQueueOptions), secondQueueOptions)
	if err != nil {
		panic(err)
	}
	secondLogger := logrus.New().WithFields(logrus.Fields{
		"operation": "consumer",
		"queueKey": secondQueueKey,
	})
	secondLogger.Info("producing task in second queue")
	secondQueueTaskID, err := secondTaskQueue.ProduceAt(ctx, &Payload{
		SomeKey: "some-value",
	}, time.Now())
	if err != nil {
		secondLogger.WithError(err).Error("failed to enqueue task to second queue")
	}
	secondLogger.Infof("enqueued task %s in second queue", secondQueueTaskID)
}

func handleStop(cancel context.CancelFunc) {
	logger := logrus.New()
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	<-sigs
	logger.Info("received termination signal, waiting for operations to finish")
	cancel()
}

To be implemented:

  • supports custom middleware
  • provides stats on what jobs are currently running

Future implementation possibilities:

  • customize concurrency per queue

Initial development sponsored by Customer.io

About

A general purpose task queue

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •