Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error in consumer is not fetching data from Redis queue after working for a while #79

Closed
toni-moreno opened this issue Mar 30, 2020 · 5 comments

Comments

@toni-moreno
Copy link

toni-moreno commented Mar 30, 2020

Hello , and thank you everybody who is working or contributing with this great project

I'm trying to run rmq to build a distributed job load system, I have a master building jobs and right now only one agent running jobs.

The master has processed from buildID 1259 to 1270, but the agent has consumed and acked only the 1259,1260,1261,1262,1263,1266, all the other still in the queue as you can see in my "Redis DeskTop Manager" screenshot

image

I have configured the agent to fetch data for each second

	redisClient = redis.NewClient(&redis.Options{
		Network:  "tcp",
		Addr:     conf.RedisEndpoint,
		DB:       0,
		Password: conf.RedisPassword,
	})
	//check error?
	_, err := redisClient.Ping().Result()
	if err != nil {
		log.Errorf("Can no connect to redis %s", err)
		return err
	}

	con := rmq.OpenConnectionWithRedisClient("sya", redisClient)
	jobReqQ = con.OpenQueue("job_request")
	jobRespQ = con.OpenQueue("job_responses")

	jobReqQ.StartConsuming(10, time.Second)
	jobReqQ.AddConsumer("consumer-"+conf.NodeID, NewAgentConsumer())

I've restarted the agent but nothing happens, data still in queue. Data is not fetched anymore ( the producer "master" agent is stopped right now)

  • how can I get old data from the queue ?
  • why is still there and I can not get and ack?

Thank you in advance

@wellle
Copy link
Member

wellle commented Mar 30, 2020

Hey! Are you calling delivery.Ack() or delivery.Reject() in your Consume() function? Because as long as you don't, rmq thinks you're still consuming those deliveries and won't call Consume() on new ones.

@toni-moreno
Copy link
Author

yes I'm acking...

type AgentConsumer struct {
}

func NewAgentConsumer() *AgentConsumer {
	return &AgentConsumer{}
}

func (consumer *AgentConsumer) Consume(delivery rmq.Delivery) {

	msg := delivery.Payload()
	delivery.Ack()
	log.Debugf("Got message ...")
	qin.Append(&JobQItem{
		data: []byte(msg),
	})
}

The agents has already consumed 6 messages ... I can not understand why ... Any Idea?

@toni-moreno
Copy link
Author

Hi @wellle , the qin.Append() in my Consume() process adds the jobs message to a finite internal queue and could block the Consume process if all our worker threads have not finished its job processing , There is any problem with that?

Anyway There is no jobs in the internal queue when I restart the agent , and restarts doesn't fix the problem.

There is any way to debug the remote redis queues stats and the consuming processing?

Thank you very much

@toni-moreno
Copy link
Author

I'm sorry @wellle the problem seems to be in the "Redis DeskTop Manager" tool, which are stealing me the messages (I can not see messages without modify data) , once stopped this tool, all messages are getting consumed by the agents.

I'm sorry again!

@wellle
Copy link
Member

wellle commented Mar 30, 2020

That’s good to know, thanks for the update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants