Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channels not closing #9

Closed
testower opened this issue May 5, 2014 · 21 comments
Closed

Channels not closing #9

testower opened this issue May 5, 2014 · 21 comments

Comments

@testower
Copy link

testower commented May 5, 2014

I have a problem with channels not closing. When using node-amqp I had the same issue, but resolved it with this queue option:

closeChannelOnUnsubscribe: true

Then when my socket.io client disconnected I would unsubscribe his consumerTag from the queue, and the channel would close:

socket.on('disconnect', function () {
    if (consumerTag) {
        my_queue.unsubscribe(consumerTag);
    }
});

Would there be an equivalent in amqp-coffee?

I'm trying all sorts of things like queue.channel.close(), consumer.cancel() and queue.delete(), but none seem to close the channels. I can see that they remain in rabbitmq dashboard with no consumer.

@testower
Copy link
Author

testower commented May 8, 2014

@Davidmba perhaps you can chime in?

@barshow
Copy link
Contributor

barshow commented May 8, 2014

I'm sorry about that. The day you made the post I pushed a new version out with a test and documentation. I must have just not commented. You can on the latest eat version use consumer.close()

@testower
Copy link
Author

testower commented May 8, 2014

Thanks! It seems though that it may not be working as expected. At the least, I seem to be getting more channels than expected.

What am I doing wrong?

             var consumer;
             connection.queue({
                queue: '/* unique_queue_name_based_on_session_id */',
                exclusive: true,
                autoDelete: true,
            }, function (err, my_queue) {
                my_queue.declare(function (err, res) {
                    connection.consume(res.queue,
                    {
                        prefetchCount: 5
                    }, function (message) {
                        message.ack();

                        /* publish data on user's socket.io connection */

                    }, function (_consumer) {
                        consumer = _consumer;
                    }).on('error', function (err) {
                        console.error(new Date() +": amqp consume error: " + JSON.stringify(err));
                    });

                    socket.on('disconnect', function () {
                        if (consumer !== undefined) {
                            consumer.close();
                        }
                        socket.leave(sessionID);
                    });
                });

@barshow
Copy link
Contributor

barshow commented May 8, 2014

well you could argue that the api should be changed a bit but it doesn't return the consumer on the callback, it just lets you know it was setup.

so

             var consumer;
             connection.queue({
                queue: '/* unique_queue_name_based_on_session_id */',
                exclusive: true,
                autoDelete: true,
            }, function (err, my_queue) {
                my_queue.declare(function (err, res) {
                    /* setup your consumer object here. */
                    consumer = connection.consume(res.queue,
                    {
                        prefetchCount: 5
                    }, function (message) {
                        message.ack();

                        /* publish data on user's socket.io connection */

                    }).on('error', function (err) {
                        console.error(new Date() +": amqp consume error: " + JSON.stringify(err));
                    });

                    socket.on('disconnect', function () {
                        if (consumer !== undefined) {
                            consumer.close();
                        }
                        socket.leave(sessionID);
                    });
                });

@testower
Copy link
Author

testower commented May 8, 2014

Aha! Thank you, that seems to have fixed it. Although I do seem to get two channels per queue, but one of them disappears by itself after a while (even before closing). Is this some kind of utility channel that amqp-coffee uses to for initializing stuff?

@barshow
Copy link
Contributor

barshow commented May 8, 2014

So thats specific to this driver, we setup a channel for doing declares, deletes and bindings. It closes it self after it hasn't been used in a while and will re open on demand.

@barshow
Copy link
Contributor

barshow commented May 8, 2014

If declared a bunch of queues all at once, your should only get one of those temporary channels.

@testower
Copy link
Author

testower commented May 8, 2014

Got it, thanks a lot. It works really well now that I have this sorted :-)

@testower testower closed this as completed May 8, 2014
@testower testower reopened this May 13, 2014
@testower
Copy link
Author

When doing unbind operations, amqp-coffee seems to add a new channel for each unbind, and they won't close. Is it a bug or am I doing something wrong?

my_queue.unbind(/* exchange */, /* routing key */);

@barshow
Copy link
Contributor

barshow commented May 13, 2014

I can not reproduce that. In the tests the channels do close. I just modified one of the tests to prove that its included below. Are you doing a lot of unbinds per queue? How are you getting the my_queue object? Are there potentially multiple things happening at once?

This is the test that works, it declares a queue sets up and tears down two bindings.

  it 'test we can unbind a queue 2885', (done)->
    this.timeout(1000000)
    amqp = null
    queue = null
    queueName = uuid()
    async.series [
      (next)->
        amqp = new AMQP {host:'localhost'}, (e, r)->
          should.not.exist e
          next()

      (next)->
        amqp.queue {queue:queueName}, (e, q)->
          should.not.exist e
          should.exist q
          queue = q
          next()

      (next)->
        queue.declare {passive:false}, (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.bind "amq.direct", "testing", (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.bind "amq.direct", "testing2", (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.unbind "amq.direct", "testing", (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.unbind "amq.direct", "testing2", (e,r)->
          should.not.exist e
          next()
      (next)->
        _.delay ->
          openChannels = 0
          for channelNumber,channel of amqp.channels
            openChannels++ if channel.state is 'open'
          openChannels.should.eql 2
          next()
        , 10

      (next)->
        _.delay ->
          openChannels = 0
          for channelNumber,channel of amqp.channels
            openChannels++ if channel.state is 'open'
          openChannels.should.eql 1
          next()
        , 500
    ], done

@testower
Copy link
Author

Sorry for taking so long to get back to you.

Yes I'm doing a lot of unbinds, possibly in parallell, and on the same queue. I'm getting the queue the same way as in my code above.

This is basically what I'm doing (but somewhat simplified, since I have several types of objects with different routing keys and such, so there would be more than just one set of socket-event-to-binding/unbinding.

     var consumer;
     connection.queue({
        queue: '/* unique_queue_name_based_on_session_id */',
        exclusive: true,
        autoDelete: true,
    }, function (err, my_queue) {
        my_queue.declare(function (err, res) {
            /* setup your consumer object here. */
            consumer = connection.consume(res.queue,
            {
                prefetchCount: 5
            }, function (message) {
                message.ack();

                /* publish data on user's socket.io connection */

            }).on('error', function (err) {
                console.error(new Date() +": amqp consume error: " + JSON.stringify(err));
            });

            socket.on('subscribe', function (data) {
                my_queue.bind('my_exchange', 'aRoutingPrefix.' + data.id);
            })

            socket.on('unsubscribe', function (data) {
                my_queue.unbind('my_exchange', 'aRoutingPrefix' + data.id);
            })

            socket.on('disconnect', function () {
                if (consumer !== undefined) {
                    consumer.close();
                }
                socket.leave(sessionID);
            });
        });

@barshow
Copy link
Contributor

barshow commented May 22, 2014

Ive tried to repro this and can't get a extra channel to stay open.
Can you run your app with DEBUG=amqp* AMQP=3 and post the amqp log messages here or email barshow@dropbox.com if you would rather. It would be useful to know the channel number that you think should be closed, you can see that in the rabbitmq web console.

@testower
Copy link
Author

Thank you so much for following up on this!

I will try and test this over the weekend and get back to you with the results.

@testower
Copy link
Author

Of course, now I'm having trouble replicating the issue locally. I'll keep you posted.

EDIT: Actually, the problem seems to occur when trying to unbind a binding that does not exist. What is the protocol on this issue? Should a client never attempt to do that, and thus keep his own book-keeping on bindings?

  amqp:Connection 19 < channelOpen {} +186ms
  amqp:AMQPParser 19 > method channelOpenOk {"reserved1":""} +187ms
  amqp:Connection 19 < queueUnbind {"queue":"node_session_6poBsdB1CJ6idZeyWCvmEt1A","exchange":"device_status","routingKey":"device.8466","arguments":{}} +186ms
  amqp:AMQPParser 19 > method channelClose {"replyCode":404,"replyText":"NOT_FOUND - no binding device.8466 between exchange 'device_status' in vhost '/' and queue 'node_session_6poBsdB1CJ6idZeyWCvmEt1A' in vhost '/'","classId":50,"methodId":50} +285ms
  amqp:Channel Channel closed by server {"replyCode":404,"replyText":"NOT_FOUND - no binding device.8466 between exchange 'device_status' in vhost '/' and queue 'node_session_6poBsdB1CJ6idZeyWCvmEt1A' in vhost '/'","classId":50,"methodId":50} +472ms
  amqp:Channel Channel reassign +0ms

EDIT: To clarify: I've verified that channel 19 in this case stays open (for what seems to be indefinately).

rabbitmq management

@testower
Copy link
Author

testower commented Jun 7, 2014

I've been unable to write a failing test for the above. Do you agree that the following should test what I described in the previous post?

it 'test no extra channels after duplicate unbind', (done)->
    amqp = null
    queue = null
    queueName = uuid()
    channel = null
    async.series [
      (next)->
        amqp = new AMQP {host:'localhost'}, (e,r)->
          should.not.exist e
          next()

      (next)->
        channel = amqp.queue {queue:queueName}, (e, q)->
          should.not.exist e
          should.exist q
          queue = q
          next()

      (next)->
        queue.declare {passive:false}, (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.bind "amq.direct", "testing", (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.unbind "amq.direct", "testing", (e,r)->
          should.not.exist e
          next()

      (next)->
        queue.unbind "amqp.direct", "testing", (e,r)->
          should.exist e
          next()

      (next)->
        _.delay ->
          openChannels = 0
          for channelNumber,channel of amqp.channels
            openChannels++ if channel.state is 'open'
          openChannels.should.eql 1
          next()
        , 10

      (next)->
        _.delay ->
          openChannels = 0
          for channelNumber,channel of amqp.channels
            openChannels++ if channel.state is 'open'
          openChannels.should.eql 1
          next()
        , 500
    ], done

@barshow
Copy link
Contributor

barshow commented Jun 9, 2014

What version of rabbit are you running because unbinds are now supposed to be idempotent. http://www.rabbitmq.com/specification.html

I would also be curious to see what is directly after

amqp:Channel Channel reassign +0ms

because whatever that action is triggered the control channel to reopen.

@testower
Copy link
Author

The rabbitmq version on the server where I experience this problem is 3.0.4. I too suspected that this could be the problem, and I'm lobbying my team for an upgrade. Running the test above locally, where I have the newest version, gives me no errors. I'll report back with the info you requested.

@barshow
Copy link
Contributor

barshow commented Jun 11, 2014

3.0.4 has some differences from the more current versions. I however still cant channels to stay open. https://github.com/dropbox/amqp-coffee/tree/version/0.1.15 has some fixes if you want to try that. Its not on npm yet but you can link to github directly.

@testower
Copy link
Author

We have finally upgraded to 3.3.3 and initial testing indicate the problem has disappeared. So I'll close this issue for now, will reopen if the problem shows up again. Thanks a lot for helping me debug this!

@trickpattyFH20
Copy link

trickpattyFH20 commented Sep 26, 2016

@testower @barshow just reading through this issue and curious about scaling this pattern.

Is it an OK / good practice to start a new consumer on the connect event for each websocket, then store that consumer somewhere in memory and finally call .close() on that consumer on the socket.disconnect event? Something just feels weird about having so many consumers

not sure what the right pattern is, just exploring different options

@barshow
Copy link
Contributor

barshow commented Sep 26, 2016

depends on if you're using a shared queue for each consumer or if your setting up and tearing down queues. Setting up and tearing down queues is a expensive and should only be done when necessary. If you're just connecting a consumer to a already existing queue that is a fairly lightweight operation so without know more about your infra and what your doing I would say its ok.

Let me know if you want a more specific answer :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants