Go Client/Server for Celery Distributed Task Queue
Having being involved in a number of projects migrating server from python to go, I have realized Go can help improve performance of existing python web applications. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.
You can also use this library as pure go distributed task queue.
Now supporting both Redis and AMQP!!
- Redis (broker/backend)
- AMQP (broker/backend) - does not allow concurrent use of channels
Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
Run Celery Worker implemented in Go
// example/worker/main.go
// Celery Task
func add(a int, b int) int {
return a + b
}
func main() {
// create broker and backend
celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", 0, "pass", gocelery.BrokerQueueName("queue-busy"))
celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", 0, "pass")
// use AMQP instead
// celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
// celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")
// Configure with 2 celery workers
celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 2)
// worker.add name reflects "add" task method found in "worker.py"
celeryClient.Register("worker.add", add)
// Start Worker - blocking method
go celeryClient.StartWorker()
// Wait 30 seconds and stop all workers
time.Sleep(30 * time.Second)
celeryClient.StopWorker()
}
go run example/worker/main.go
You can use custom struct instead to hold shared structures.
type MyStruct struct {
MyInt int
}
func (so *MyStruct) add(a int, b int) int {
return a + b + so.MyInt
}
// code omitted ...
ms := &MyStruct{10}
celeryClient.Register("worker.add", ms.add)
// code omitted ...
Submit Task from Python Client
# example/test.py
from celery import Celery
app = Celery('tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379'
)
@app.task
def add(x, y):
return x + y
if __name__ == '__main__':
# submit celery task to be executed in Go workers
ar = add.apply_async((5456, 2878), serializer='json')
print(ar.get())
python example/test.py
Run Celery Worker implemented in Python
# example/worker.py
from celery import Celery
app = Celery('tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379'
)
@app.task
def add(x, y):
return x + y
cd example
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle
Submit Task from Go Client
func main() {
// create broker and backend
celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")
// use AMQP instead
// celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
// celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")
// create client
celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 0)
// send task
asyncResult, err := celeryClient.Delay("worker.add", 3, 5)
if err != nil {
panic(err)
}
// check if result is ready
isReady, _ := asyncResult.Ready()
fmt.Printf("ready status %v\n", isReady)
// get result with 5s timeout
res, err = asyncResult.Get(5 * time.Second)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(res)
}
}
go run example/client/main.go
{
"expires": null,
"utc": true,
"args": [5456, 2878],
"chord": null,
"callbacks": null,
"errbacks": null,
"taskset": null,
"id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
"retries": 0,
"task": "worker.add",
"timelimit": [null, null],
"eta": null,
"kwargs": {}
}
You are more than welcome to make any contributions. Please create Pull Request for any changes.
The gocelery is offered under MIT license.