Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer#close never resolves despite no messages being handled #3

Closed
jsbrn opened this issue Oct 1, 2022 · 6 comments
Closed

Consumer#close never resolves despite no messages being handled #3

jsbrn opened this issue Oct 1, 2022 · 6 comments

Comments

@jsbrn
Copy link

jsbrn commented Oct 1, 2022

@cody-greene When I call consumer.close() and no messages are being handled, the Promise never resolves and my shutdown sequence hangs indefinitely. Are there other conditions that have to be met before a consumer can shut down?

@cody-greene
Copy link
Owner

Are there any connection errors happening while the consumer is closing? If the client is unable to establish a connection — for example if the password is incorrect —then you may have to wait for the acquireTimeout before the close fn resolves. You can try setting a lower value for the acquireTimeout (default 20,000 milliseconds). Otherwise a concise code example, replicating this issue, would really help here.

Something like:

const {Connection} = require('rabbitmq-client')

const rabbit = new Connection({
  url: process.env.RABBITMQ_URL,
  acquireTimeout: 1000
})

rabbit.on('error', (err) => {
  console.log('rabbitmq error', err)
})
rabbit.on('connection', () => {
  console.log('rabbitmq connected')
})

const consumer = rabbit.createConsumer({
  queue: 'my-queue'
})

consumer.on('error', (err) => {
  console.log('consumer error', err)
})
consumer.on('ready', () => {
  console.log('consumer ready')
})

process.on('SIGINT', async () => {
  console.log('shutting down')
  await consumer.close()
  await rabbit.close()
})

@cody-greene
Copy link
Owner

Did this help? If there in a bug in this library then I would like to know more about it.

@jsbrn
Copy link
Author

jsbrn commented Oct 3, 2022

const rabbit = new Connection({
    url: "amqp://admin:" + process.env.RABBITMQ_PASSWORD + "@rabbit:5672",
    // wait 1 to 30 seconds between connection retries
    retryLow: 1000,
    retryHigh: 5000,
    acquireTimeout: 2000,
});

rabbit.on("error", (err) => {
    // connection refused, etc
    console.error(err);
});

rabbit.on("connection", async () => {
    console.log("The connection is successfully (re)established");
});

const playerOrdersConsumer = rabbit.createConsumer(
    {
        queue: PLAYER_ORDERS_QUEUE,
        queueOptions: { exclusive: false },
        // very important, tells each consumer to only consume x messages at a time!
        qos: { prefetchCount: 3, global: false },
    },
    async (msg: AsyncMessage) => {
        await handleOrderMessage(msg);
    }
);

playerOrdersConsumer.on("ready", () => {
    console.log("Player orders consumer is ready!");
});

playerOrdersConsumer.on("error", (err: any) => {
    console.log("Player orders consumer error", err);
});

async function handleOrderMessage(message: AsyncMessage) {
    try {
        const order = message.body as QueuedOrder;
        const company = await CompanyService.getCompany(order.guildId, order.channelId);
        if (company != null) {
            await JobService.waitForJobOpening(company.guildId, company.channelId);
            await JobService.startJob(company.guildId, company.channelId, JobType.PROCESS_ORDERS);
            if (order.userId === process.env.BOT_ID) {
                await OrderService.simulateOrder(company);
            } else {
                if (await OrderService.validateQueuedOrder(order, company)) {
                    await OrderService.placeOrder(order.userId, order.channelId, order.guildId, order.price, order.amount);
                }
            }
            await OrderService.processQueueFor(company);
            await JobService.stopJob(company.guildId, company.channelId, JobType.PROCESS_ORDERS);
        }    
    } catch (error) {
        console.log("Error processing order from queue", message.body, error);
    }
}

async function close() {
    // close all consumers after consuming the prefetched messages
    await playerOrdersConsumer.close();
    // disconnect from rabbit
    await rabbit.close();
}

Hi @cody-greene, sorry for the delay. Here is a rough code sample of my project. I know it is not a connection issue because rabbit connects just fine, and I am only attempting to shut down my project's process. My acquireTimeout is very low yet I still have the problem of indefinitely waiting for a consumer to close.

There are no messages in the queue when this problem occurs.

edit: I have put logs in the consumer handler that tell me when it is entered and exited, so I know that the handler function is never called. It can't be because of an error processing a message on my part, it could likely be a bug in the framework.

@cody-greene
Copy link
Owner

I'm not able to replicate your issue with the above sample. Perhaps the secret lies in where you're calling that combined close() function? In a event handler, or a setTimeout, etc. But at this point I'd be extremely grateful if you could narrow down the problem with a small test file. Perhaps you can create a pull-request with a failing version of this test?

// node-rabbitmq-client/test/issue-3.ts
import test from 'tape'
import Connection from '../src'
import {expectEvent, sleep} from './util'

const RABBITMQ_URL = process.env.RABBITMQ_URL

//https://github.com/cody-greene/node-rabbitmq-client/issues/3
test/*.only*/('issue-3 Consumer#close never resolves despite no messages being handled', async (t) => {
  t.timeoutAfter(5000) // test fails after X milliseconds

  const rabbit = new Connection({
    url: RABBITMQ_URL
  })

  const queue = '__test_2c1f26cda1290327__'

  const consumer = rabbit.createConsumer(
    {
      queue: queue
    }, 
    () => { /* no-op */ }
  )

  await expectEvent(consumer, 'ready')
  t.pass('consumer is ready')

  await consumer.close()
  t.pass('consumer closed')

  await rabbit.close()
  t.pass('connection closed')
})

@cody-greene
Copy link
Owner

Have you been able to isolate the issue here? I'm inclined to close this after another 24 hours without more information.

@jsbrn
Copy link
Author

jsbrn commented Oct 17, 2022

I haven't. I suspect it's something that is related to my own configuration. The application that my code belongs to runs in a docker container. Rabbit is also in a container. I'm using Docker Swarm. If I find a solution I will update you here.

@jsbrn jsbrn closed this as completed Oct 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants