Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gocelery is apparently reading but dropping tasks #94

Open
JJ opened this issue Mar 6, 2019 · 2 comments
Open

gocelery is apparently reading but dropping tasks #94

JJ opened this issue Mar 6, 2019 · 2 comments

Comments

@JJ
Copy link

JJ commented Mar 6, 2019

I have created this example, taken from the (1 day old) README:

package main
// Estructura de https://github.com/gocelery/gocelery

import (
	"os"
	"fmt"
	"time"
	"github.com/gocelery/gocelery"
)

// Celery Task
var comandos = make(map[string]int)
func registra(comando string) string {
	fmt.Println( comando )
	comandos[comando]++
	fmt.Println( comandos )
	return comando
}

func main() {
	// Crea el broker y el backend
	fmt.Println(os.Getenv("RMQ_PASS"))
	url := fmt.Sprintf("amqp://platzi:%s@localhost/platzi",os.Getenv("RMQ_PASS"))
	fmt.Println(url)
	celeryBroker := gocelery.NewAMQPCeleryBroker(url)
	celeryBackend := gocelery.NewAMQPCeleryBackend(url)

	// Usa dos workers
	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 1)

	// Registra la función
	celeryClient.Register("worker.registra", registra)

	// Arranca el worker
	fmt.Println( "Arranca el worker" )
	go celeryClient.StartWorker()

	// Espera y para 
	time.Sleep(120 * time.Second)
	celeryClient.StopWorker()
}

I'm using this Python client:

import os
from celery import Celery
from dotenv import load_dotenv

load_dotenv()
app = Celery('registra-con-go',
             broker='amqp://platzi:{}@localhost/platzi'.format(os.environ.get('RMQ_PASS')))

if __name__ == '__main__':
    ordenes =['uno','uno','uno','dos','dos', 'tres']
    for i in ordenes:
        print( "Envía ", i )
        app.send_task("worker.registra", [i])

The client is sending to the "celery" queue, and apparently that queue is being emptied by the worker. However, the function "registra" is not called. Any idea?

JJ added a commit to JJ/slack-bot-platzi that referenced this issue Mar 6, 2019
@yoonsio yoonsio added the bug label Mar 6, 2019
@yoonsio yoonsio added this to To do in v1 release with 3.x legacy support via automation Mar 6, 2019
@yoonsio yoonsio moved this from To do to In progress in v1 release with 3.x legacy support Mar 6, 2019
@m3nu
Copy link

m3nu commented Jul 20, 2019

@JJ, I was just scratching my head over the same issue. After adding some debug code, I noticed that unmarshalling failed near amqp_broker.go:195 due to different fields.

Then I noticed that I didn't have CELERY_TASK_PROTOCOL=1 in my Python client. I don't see it in your script either. Maybe worth checking.

@kobayashi
Copy link
Contributor

kobayashi commented Nov 13, 2020

This is is not a bug. As @m3nu mentioned, the python client have to set at least CELERY_TASK_PROTOCOL=1 in client side config to catch the job by gocelery worker. README describes about it in detail.

So simply add the config options, here is a simpler example.

# client.py

import os
from celery import Celery

app = Celery('registra-con-go', broker='amqp://guest@127.0.0.1//')

class conf:
    CELERY_TASK_SERIALIZER='json'
    CELERY_ACCEPT_CONTENT=['json']
    CELERY_RESULT_SERIALIZER='json'
    CELERY_ENABLE_UTC=True
    CELERY_TASK_PROTOCOL=1

app.config_from_object(conf)


if __name__ == '__main__':
    ordenes =['uno','uno','uno','dos','dos', 'tres']
    for i in ordenes:
        print( "Envía ", i )
        app.send_task("worker.registra", [i])

Then the result must be

$ go run main.go
amqp://guest@127.0.0.1
Arranca el worker
uno
map[uno:1]
uno
map[uno:2]
uno
map[uno:3]
dos
map[dos:1 uno:3]
dos
map[dos:2 uno:3]
tres
map[dos:2 tres:1 uno:3]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Development

No branches or pull requests

4 participants