Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks stuck in ready queue #99

Closed
pdamir opened this issue Jan 15, 2021 · 7 comments
Closed

Tasks stuck in ready queue #99

pdamir opened this issue Jan 15, 2021 · 7 comments

Comments

@pdamir
Copy link

pdamir commented Jan 15, 2021

Hello! I've have recently started running into problems and I honestly I don't know where to look anymore. Our ElastiCache Redis node has been having increased memory usage and even reached 100% and crashed a few services due to tasks stuck in unacked/ready state, such as

found so far 'rmq::connection::automation.order-JpP1pZ::queue::[automation.order]::unacked' with 102513 items
found so far 'rmq::connection::personification.lastviewed-Crt5kY::queue::[personification.lastviewed]::ready' with 415308 items

It seems that every time the k8s pods of the workers that handle these queues are restarted (manually or after a new build), tasks keep piling up on some queues while others are normally processed.

Cleaner is run every time pod is started, as we see the first print statement below, but not the print statement for error.

func Cleaner() {
	defer PanicRecovery()
	fmt.Println("running cleaner")
	cleaner := rmq.NewCleaner(connection)

	for range time.Tick(time.Second) {
		if err := cleaner.Clean(); err != nil {
			fmt.Println("error cleaning", err)
		}
	}
}

I can add more functions from our code if needed, but at this point any advice or pointer is very much appreciated

@wellle
Copy link
Member

wellle commented Jan 15, 2021

You need to set up a cleaner to return unacked deliveries of stale connections back to the ready list. See https://github.com/adjust/rmq#cleaner

@pdamir
Copy link
Author

pdamir commented Jan 15, 2021

I believe it is set up correctly, as we've had it setup on many other services which behave fine in the same way. This is the way we have it setup on all services.

// main.go
const queue = "automation.order"

func main() {
	// make sure we close off all open connections before shutting off
	// start consumning from the q
	go func(q string) {
		redis.StartConsuming(q)
	}(queue)

	log.NewEvent(l.ServerStart, nil).Info("server started", "port", "5000")
	if err := http.ListenAndServe(":5000", nil); err != nil {
		log.NewEvent(l.ServerStart, nil).Error("could not start the server", "err", err)
	}
}
// redis.go
// StartConsuming on the queue
func StartConsuming(queue string) {
	defer PanicRecovery()
	if connection == nil {
		connection = rmq.OpenConnection("automation.order", "tcp", os.Getenv("REDIS"), 0)
	}

	if q == nil {
		q = connection.OpenQueue(queue)
	}

	q.StartConsuming(unackedLimit, 500*time.Millisecond)
	for i := 0; i < numConsumers; i++ {
		c := consumer.NewConsumer(i)
		q.AddConsumer(c.Name, c)
	}

	// start returning from the q
	go func(q string) {
		StartReturning(q)
	}(queue)

	// start the cleaner as well
	go func() {
		Cleaner()
	}()

	select {}
}
// redis.go
// NewConsumer will create a new instance of the consumer object that will consume redis queue messages
func NewConsumer(tag int) *Consumer {
	return &Consumer{
		Name: fmt.Sprintf("consumer.%s.%d", name, tag),
	}
}
// redis.go
// StartReturning that were rejected
func StartReturning(queue string) {
	defer PanicRecovery()
	if connection == nil {
		connection = rmq.OpenConnection("automation.order", "tcp", os.Getenv("REDIS"), 0)
	}

	if q == nil {
		q = connection.OpenQueue(queue)
	}

	for range time.Tick(time.Second * 10) {
		q.ReturnAllRejected()
	}
}

I'm not sure if this is the proper way of calling cleaner or we have some other issues. But also, why would ready tasks not be consumed and just pile up there.
Could it be that when the pod is restarted, it creates a new connection, and somehow the queues on the old connection just stay there and don't get processed?

@wellle
Copy link
Member

wellle commented Jan 15, 2021

Could it be that many of your old pods are still running? So that those connections would still be active?

Also, could you start a queue handler and show me the overview? See https://github.com/adjust/rmq/blob/master/example/handler/main.go
If it contains confidential information you can also send it to me via mail.

@pdamir
Copy link
Author

pdamir commented Jan 18, 2021

Hi, since we've had a memory usage issue over the weekend, I've had to remove the ::ready queue that was causing issues.
Meanwhile, our automation.order queue had grown to over 5000 items, and I've discovered that we've had an issue in our code where we didn't reject or acknowledge tasks, so once I've fixed that the whole queue was processed.

One issue I noticed, after rebuilding the service for the first time, cleaner function has returned this error, which was not shown on consecutive pod restarts and the cleaner was working as intended.
image

This one however still exists and is not changing, and by the looks of it this connection is no longer used, and the pod is not running.
image

This is the stats overview
image
image

If you don't mind, I will keep this issue open and post more logs/findings regarding the queue we've had to completely remove over the weekend so it wouldn't cause memory issues, since that particular queue is being intentionally bombarded and grows to 500,000 items over 24 hours

@pdamir pdamir closed this as completed Jan 27, 2021
@wellle
Copy link
Member

wellle commented Jan 27, 2021

@pdamir: Did you get it solved?

@pdamir
Copy link
Author

pdamir commented Jan 27, 2021

Our main issue was due to our calls to Elasticsearch update method which failed and caused the tasks to be rejected, we've managed to fix it and there are no more issues. We have noticed that there is a single older queue with 19 ready items which never get consumed, but when we add them to the queue, new one is created and they are consumed. We have just removed the older queue key from Redis manually.
Thank you for your help! Much appreciated

@wellle
Copy link
Member

wellle commented Jan 29, 2021

Glad to hear, thanks for the update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants