Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(disposition): support undeliverable-here in modified outcomes #9

Closed
wants to merge 1 commit into from

Conversation

mbroadst
Copy link

Previously, specifying undeliverable-here as true in a modified
outcome simply resulted in the message being rejected. This patch
adds tracking to the outgoing link management in order to not
redeliver messages to links that indicate that messages are not
deliverable there.

@mbroadst
Copy link
Author

mbroadst commented Jun 23, 2016

@grs here is the amqp10-based test case:

'use strict';
const amqp = require('amqp10'),
      Policy = amqp.Policy,
      expect = require('chai').expect,
      config = { address: 'localhost' };

let test = {};
describe('modified', () => {
  afterEach(() => test.client.disconnect().then(() => { delete test.client; }));

  it('should respect undeliverable here', function(done) {
    test.client = new amqp.Client(Policy.Utils.RenewOnSettle(1, 1, Policy.Default));
    return test.client.connect(config.address)
      .then(() => Promise.all([
        test.client.createSender('test.disposition.queue'),
        test.client.createReceiver('test.disposition.queue')]))
      .spread((sender, receiver) => {
        let received = 0;
        receiver.on('message', msg => {
          expect(received).to.eql(0);
          received++;
          receiver.modify(msg, { undeliverableHere: true });

          return test.client.createReceiver('test.disposition.queue')
            .then(receiver2 => {
              receiver2.on('message', msg => {
                expect(received).to.eql(1);
                done();
              });
            });
        });

        return sender.send('testing');
      });
  });
});

NOTE: this requires a local queue named test.disposition.queue be created.

@@ -79,6 +79,9 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,

void OutgoingFromQueue::init()
{
// observe queue for changes to track undeliverable messages
queue->getObservers().add(boost::shared_ptr<QueueObserver>(shared_from_this()));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I'd really like to make the tracking lazy, only activating if actually requested/needed, and only then registering the queue observer and tracking the messages.

Copy link
Author

@mbroadst mbroadst Jun 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure that sounds reasonable. you think its enough to just register it during handle below, with a boolean flag on the class?

(actually maybe just making the SequenceSet and optional would solve the requirement for the boolean, and marginally reduce memory consumption?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either of those works. One other point is that we need to deregister the observer when the consumer ends.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered about that. From other examples of QueueObserver's in the codebase I didn't see anything that explicitly removed itself from the observer list, so assumed (dangerously/lazily 😄) it was cleaned up automatically somewhere.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more question here: is the right place to stop observation in detached?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@mbroadst
Copy link
Author

mbroadst commented Jun 24, 2016

@grs okay I think that should take care of your comments from yesterday.

PS I @mentioned you in the most recent commit to this repo which breaks the build at least for me locally by duplicating a friend class directive in the main Queue header.

Previously, specifying `undeliverable-here` as `true` in a modified
outcome simply resulted in the message being rejected. This patch
adds tracking to the outgoing link management in order to not
redeliver messages to links that indicate that messages are not
deliverable there.
@grs
Copy link
Member

grs commented Jun 24, 2016

Looks good, thanks! Can you attach this as a patch to a JIRA? As qpid-cpp is still in svn I don't think I can merge the PR directly.

@mbroadst
Copy link
Author

mbroadst commented Jun 24, 2016

@grs sure thing, I'll do you one better: https://patch-diff.githubusercontent.com/raw/apache/qpid/pull/9.patch

@grs
Copy link
Member

grs commented Jun 27, 2016

Actually, I've confirmed a patch isn't necessary. I can use your PR branch. I'll give this a test today and all going well merge it in.

@grs
Copy link
Member

grs commented Jun 27, 2016

@mbroadst this is now committed. I somehow messed up the intended merge commit (I think perhaps on git svn rebase), so I'm afraid I neither marked the JIRA for this (https://issues.apache.org/jira/browse/QPID-7324) nor kept your name in the commit log. Thanks very much for the contribution though!

@mbroadst
Copy link
Author

@grs no worries, it's the functionality I'm after not the attribution 😄

@mbroadst mbroadst closed this Jun 27, 2016
@mbroadst mbroadst deleted the support-undeliverable-here branch June 27, 2016 20:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants