Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example that includes receipt #2

Closed
flowchartsman opened this issue Apr 24, 2015 · 18 comments
Closed

Add example that includes receipt #2

flowchartsman opened this issue Apr 24, 2015 · 18 comments

Comments

@flowchartsman
Copy link

It would be nice to see an example that deals with sending (and waiting for) a receipt. I've been having some trouble getting this to work in my own code, and it would be nice to have a working example.

@gmallard
Copy link
Owner

You are looking for receipts for a SEND, yes?

@flowchartsman
Copy link
Author

For the ACK. I should be more specific: I am attempting to use ActiveMQ with prefetch to pull messages from a few different destinations to refill a broker's internal queue.

My workflow is:

  1. connect
  2. sub to queue q.1 with prefetch X where X is how many messages I need
  3. process X messages
  4. ack the last message
  5. unsubscribe
  6. repeat 2-5 with queues q.2, q.3, etc.

The problem I'm running into is that the unsubscribe right after the ack is causing messages to not get acked sometimes, and some of the later subscribes are getting nothing at all.

I tried to solve this by requesting a receipt for the ack and then consuming messages off of the subscription until the receipt was received, but receipts are not sent with a "subscription" header, so the library does not send them to the channel I'm listening on.

I don't even know if this approach will work, but I have no way to wait for the receipt, so I can't test.

@gmallard
Copy link
Owner

Interesting scenario. Let me think about it.

@gmallard
Copy link
Owner

Given your description, several thoughts. Especially look at number 2.

Number 1:

You should look at ActiveMQ jira. Lots of difficulties with ACKs, But this package should allow you to handle / work around that.

Number 2:

In your workflow above, I think you should have a step 4.5) Receive the RECEIPT. Do you do that?

You are looking on the wrong channel for the RECEIPT I am almost positive from what you say. It will be on the connection level MessageData channel, not the SUBSCRIBE channel. You can see an example of this by looking at the code for DISCONNECT. This package always asks for a receipt on DISCONNECT. For that the a MessageData struct is stored in conn.DisconnectReceipt field, and can be inspected after DISCONNECT.

Number 3:

Thanks much for this issue. There is too much about RECEIPTs that is currently not documented well in stompngo, and that there are no examples for it here.

Number 4:

I am working on a couple of very simple RECEIPT examples, because that surely needs to be done. And additional documentation in the stompngo package. Will keep you informed.

@flowchartsman
Copy link
Author

Number 2: I didn't know what channel to look for it on, so no. I will try looking on the MessageData channel now.

Question: if my prefetch is larger than the amount I need, do I need to drain the subscription channel after unsubscribe to allow it to be GCed?

@gmallard
Copy link
Owner

OK, try what I suggested, something like:

    r := <-conn.MessageData

'r' should have the MessageData struct in it.

I do not know about prefetch and GC. Draining latent messages seems reasonable to me. The concept of prefetch is purely AMQ. Has nothing to do with STOMP specified behavior. If there are messages on the wire, the package should be reading them - provided the subchannel capacity allows it.

One other question I need to think about occurred to me: what level of the STOMP protocol are you connecting with?

@flowchartsman
Copy link
Author

version 1.2.

If there is no concept of prefetch, why is there ack mode "client" versus "client-individual"? Or do you just mean the ability to specify 1-X messages versus a completely open-ended number? That I understand.

@gmallard
Copy link
Owner

Please read the STOMP protocol specifications. All three of them. Found here:

http://stomp.github.io/

The word 'prefetch' is never used in any specification. The implementation of that concept is very specific to ActiveMQ.

And not used by other major message brokers such as Apollo or RabbitMQ.

@flowchartsman
Copy link
Author

I have read the specs. Sorry, what I meant was, whether you call it prefetch or not, the STOMP protocol allows receiving multiple messages and then ACKing only the last one, which is why "client" exists. So, whether it's with ActiveMQ and prefetch or with server that simply has more than one message in flight at a time and no special name for it, it's still possible to have one or more messages you haven't processed at the time you UNSUBSCRIBE, right? so it would seem to me to be a good idea to drain a channel on unsubscribe automatically (or at least notify the user that they might want to do this or provide a method for them to do it automatically), because, in the event that the outgoing channel is full, any goroutine writing to it would block forever, leading to a memory leak in a long-running process.

I'd be happy to submit an issue/pull request, I just wanted to discuss it to make sure the library wasn't doing it already and I missed it, or to to make sure we were on the same page about the problem.

@gmallard
Copy link
Owner

Several comments:

The stompngo package does not do any automatic draining. The spec does not even mention the subject, even as a SHOULD, ergo it is not even considered. If your client design and implementation requires it, you need to figure out how to accomplish it in client code (which should not be difficult). I would consider a PR for stompngo that:

  • implemented
  • documented
  • tested

a way for the client to optionally accomplish it. But it cannot be the default way of operation. So by all means, submit a PR under those conditions.

The stompngo package intends to implement only the spec required or suggested behavior - absolutely nothing more, and nothing less. My intention is to keep the package very lean, doing almost nothing other than what the specs indicate what MUST or SHOULD be done. I learned my lessons on that with the Ruby stomp gem, and I won't repeat those mistakes.

Frankly I do not understand what outgoing data has to do with any of this. Incoming / outgoing data do not share buffers in the stompngo package, or even pure TCP buffers.

If you can implement and publish a (non AMQ specific) example that clearly demonstrates your real concern here, that would clearly be helpful.

Also, two RECEIPT examples have been pushed here. Documentation regarding RECEIPT frames in the stompngo package is still a TODO.

@flowchartsman
Copy link
Author

I understand your rationale. You're right, maybe default drain is inappropriate. I will see about implementing a method to optionally drain on unsubscribe. That said, I think the docs should at least mention the possibility that the following scenario might occur with a queue server that sends more than one message before receiving an ACK:

  1. Client subscribes to a destination, which creates a chan MessageData with a buffer size of c.scc (default: 1)
//subscription.go
if hid { // Client specified id
     c.subs[id] = make(chan MessageData, c.scc) // Assign subscription
}
  1. Message(s) arrive with that destination specified in the subscription header and are picked up in the read loop here
//reader.go
if sid, ok := f.Headers.Contains("subscription"); ok {
     c.subsLock.Lock()
     c.subs[sid] <- d
     c.subsLock.Unlock()
} else {
     c.input <- d
}
  1. Client processes some messages or not, but there is still a message in the channel for this subscription.
  2. client unsubscribes, which causes the following
//unsubscribe.go
c.subsLock.Lock()                                                                                                                                                              defer c.subsLock.Unlock()
// ...
close(c.subs[sid])
delete(c.subs, sid)
  1. Before c.subsLock.Lock() is called in the unsubscribe, another message arrives and is picked up by the reader goroutine. Wouldn't this block? The unsubscribe wouldn't be able to get the subsLock, and the reader goroutine would be blocked trying to send to the subscription channel.

As far as the spec is concerned, only the 1.2 spec says anything about the client's obligation vis a vis unprocessed messages, and even then it is that they SHOULD NACK messages they have not processed. Not that they MUST. In the context of stompngo, I guess this would mean pulling remaining messages from the channel and NACKing them. But since it's not a MUST, it seems kind of unspecified what should happen here, and I can see other users overlooking the fact that there might be messages waiting on that subscription channel and that the reader goroutine might be blocked. Maybe I'm missing something, though.

I'll try and set up an example that demonstrates this. Might take me a couple of days to get to it.

EDIT: This situation might be the only potential spec-backed argument for auto-drain: if you assume that the client's call to unsubscribe means that they are no longer interested in messages on that subscription (which it seems like you might, by closing and deleting the subs reference to the channel), then at least the 1.2 spec says that they SHOULD NACK unprocessed messages. What do you think?

@gmallard
Copy link
Owner

First, I missed your invite to a conversation, did not see it until after midnight. You were no longer around.

Second, I will digest your most recent post above tomorrow.

Third, I have done quite a bit of thinking, and written several chunks of experimental code. I am coming around to your view on a 'draining' functionality in at least some situations. Please proceed with a solution if you have one. I do think it needs to be optional. Not all scenarios would require it I believe. I want to do a number of other 'experiments'.

@gmallard
Copy link
Owner

There are indeed scenarios where a client can hang/block. This can occur with either ackmode client or client-individual. I have recreated a couple of them here using a modified/mangled subscribe.go from this package.

After some thought, I put this ugly/fragile piece of code between Ack and Unsubscribe:

    b := false
    for {
        select {
        case _ = <- r: // Drop a MessageData on the floor
            fmt.Println("dropped 1")
            break
        case _ = <- time.After(time.Duration(100 * time.Millisecond)):
            fmt.Println("breaking")
            b = true
            break   
        }
        if b {
            break
        }
    }

That actually worked in my experiments, but I suspect it is not exactly what would be required.

@gmallard
Copy link
Owner

On 04/26/2015 11:58 AM, Andy Walker wrote:

Since the 1.2 spec says that clients |SHOULD| |NACK| unsubscribed
messages, and since the behavior is unspecified for 1.1, but |NACK|
still exists, we could (optionally) |NACK| outstanding messages during
unsubscribe:

//unsubscribe.go
if oki {
if len(c.subs[sid]) >0 {
switch c.Protocol() {
case SPL_12, SPL_11:
NackDrain:
for {
select {
case m := <-c.subs[sid]:
//NACK m
default:
break NackDrain
}
}
case SPL_10:
InputDrain:
for {
select {
case m := <-c.subs[sid]:
c.input <- m// ???
default:
break InputDrain
}
}
}
}
close(c.subs[sid])
delete(c.subs, sid)
}

If in "client" mode, you could also only |NACK| the last message.
Since unsubscribe has the lock on |c.subsLock| this should be okay.
Unsure if the 1.0 solution is correct or not. Still kind of thinking
about this solution.


Reply to this email directly or view it on GitHub
#2 (comment).

1.0 clients can pose a problem. They are allowed to subscribe without
an ID. In which case they do not have a unique subscribe channel.
godoc mentions this, and points to the wiki for a detailed explanation:

https://github.com/gmallard/stompngo/wiki/subscribe-and-messagedata

Also, clients do not have to unsubscribe. They could call disconnect,
of just close the connection.

There are a lot of facets to this subject.

@gmallard
Copy link
Owner

Couple of items.

First, I am back to the $dayjob tomorrow, and will have small amounts of time to consider this M-F.

Next, please open an issue, with appropriate title in the stompngo project that describes our discussions here regarding blocks/hangs. That project is the correct place for the issue under consideration. Perhaps reference this issue as documentation. Because of the title I want to close this issue.

Next, I think you are frequently on irc, #go-nuts. I am guessing from the 'alaska' name I see there. Correct? I am almost always signed in, but may not be in front of that particular computer. I am 'flaguy48'. Perhaps we can hold discussions there as time permits.

Next, I will likely push some failing examples, with comments to this examples package. Please watch for that.

@flowchartsman
Copy link
Author

The other problem is the draining. It can't be done reliably this way. (see here: https://play.golang.org/p/CaEAvDQA53). I have other thoughts, but they can be discussed elsewhere. I'll file the issue shortly.

@gmallard
Copy link
Owner

gmallard commented May 6, 2015

I just pushed d6efd7d to the sngissue25 branch here.

It attempts to recreate the original scenario you described. But I am not sure it is entirely accurate.

Please take a look at it if you get a chance.

@gmallard
Copy link
Owner

gmallard commented Jul 6, 2016

Technically, an example that demos receipts has been complete for quite a while. The conversation in this issue discussion drifted a bit.

See:

43a8e1a Demo RECEIPT on an ACK.
a6108a0 Demo RECEIPT on a SEND.

@gmallard gmallard closed this as completed Jul 6, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants